Home | History | Annotate | Download | only in grpc
      1 /*
      2  *
      3  * Copyright 2014 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package grpc
     20 
     21 import (
     22 	"bytes"
     23 	"errors"
     24 	"fmt"
     25 	"io"
     26 	"math"
     27 	"net"
     28 	"net/http"
     29 	"reflect"
     30 	"runtime"
     31 	"strings"
     32 	"sync"
     33 	"time"
     34 
     35 	"io/ioutil"
     36 
     37 	"golang.org/x/net/context"
     38 	"golang.org/x/net/http2"
     39 	"golang.org/x/net/trace"
     40 
     41 	"google.golang.org/grpc/codes"
     42 	"google.golang.org/grpc/credentials"
     43 	"google.golang.org/grpc/encoding"
     44 	"google.golang.org/grpc/encoding/proto"
     45 	"google.golang.org/grpc/grpclog"
     46 	"google.golang.org/grpc/internal"
     47 	"google.golang.org/grpc/internal/channelz"
     48 	"google.golang.org/grpc/keepalive"
     49 	"google.golang.org/grpc/metadata"
     50 	"google.golang.org/grpc/stats"
     51 	"google.golang.org/grpc/status"
     52 	"google.golang.org/grpc/tap"
     53 	"google.golang.org/grpc/transport"
     54 )
     55 
     56 const (
     57 	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
     58 	defaultServerMaxSendMessageSize    = math.MaxInt32
     59 )
     60 
     61 type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
     62 
     63 // MethodDesc represents an RPC service's method specification.
     64 type MethodDesc struct {
     65 	MethodName string
     66 	Handler    methodHandler
     67 }
     68 
     69 // ServiceDesc represents an RPC service's specification.
     70 type ServiceDesc struct {
     71 	ServiceName string
     72 	// The pointer to the service interface. Used to check whether the user
     73 	// provided implementation satisfies the interface requirements.
     74 	HandlerType interface{}
     75 	Methods     []MethodDesc
     76 	Streams     []StreamDesc
     77 	Metadata    interface{}
     78 }
     79 
     80 // service consists of the information of the server serving this service and
     81 // the methods in this service.
     82 type service struct {
     83 	server interface{} // the server for service methods
     84 	md     map[string]*MethodDesc
     85 	sd     map[string]*StreamDesc
     86 	mdata  interface{}
     87 }
     88 
     89 // Server is a gRPC server to serve RPC requests.
     90 type Server struct {
     91 	opts options
     92 
     93 	mu     sync.Mutex // guards following
     94 	lis    map[net.Listener]bool
     95 	conns  map[io.Closer]bool
     96 	serve  bool
     97 	drain  bool
     98 	cv     *sync.Cond          // signaled when connections close for GracefulStop
     99 	m      map[string]*service // service name -> service info
    100 	events trace.EventLog
    101 
    102 	quit               chan struct{}
    103 	done               chan struct{}
    104 	quitOnce           sync.Once
    105 	doneOnce           sync.Once
    106 	channelzRemoveOnce sync.Once
    107 	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
    108 
    109 	channelzID          int64 // channelz unique identification number
    110 	czmu                sync.RWMutex
    111 	callsStarted        int64
    112 	callsFailed         int64
    113 	callsSucceeded      int64
    114 	lastCallStartedTime time.Time
    115 }
    116 
    117 type options struct {
    118 	creds                 credentials.TransportCredentials
    119 	codec                 baseCodec
    120 	cp                    Compressor
    121 	dc                    Decompressor
    122 	unaryInt              UnaryServerInterceptor
    123 	streamInt             StreamServerInterceptor
    124 	inTapHandle           tap.ServerInHandle
    125 	statsHandler          stats.Handler
    126 	maxConcurrentStreams  uint32
    127 	maxReceiveMessageSize int
    128 	maxSendMessageSize    int
    129 	useHandlerImpl        bool // use http.Handler-based server
    130 	unknownStreamDesc     *StreamDesc
    131 	keepaliveParams       keepalive.ServerParameters
    132 	keepalivePolicy       keepalive.EnforcementPolicy
    133 	initialWindowSize     int32
    134 	initialConnWindowSize int32
    135 	writeBufferSize       int
    136 	readBufferSize        int
    137 	connectionTimeout     time.Duration
    138 }
    139 
    140 var defaultServerOptions = options{
    141 	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
    142 	maxSendMessageSize:    defaultServerMaxSendMessageSize,
    143 	connectionTimeout:     120 * time.Second,
    144 }
    145 
    146 // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
    147 type ServerOption func(*options)
    148 
    149 // WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
    150 // before doing a write on the wire.
    151 func WriteBufferSize(s int) ServerOption {
    152 	return func(o *options) {
    153 		o.writeBufferSize = s
    154 	}
    155 }
    156 
    157 // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
    158 // for one read syscall.
    159 func ReadBufferSize(s int) ServerOption {
    160 	return func(o *options) {
    161 		o.readBufferSize = s
    162 	}
    163 }
    164 
    165 // InitialWindowSize returns a ServerOption that sets window size for stream.
    166 // The lower bound for window size is 64K and any value smaller than that will be ignored.
    167 func InitialWindowSize(s int32) ServerOption {
    168 	return func(o *options) {
    169 		o.initialWindowSize = s
    170 	}
    171 }
    172 
    173 // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
    174 // The lower bound for window size is 64K and any value smaller than that will be ignored.
    175 func InitialConnWindowSize(s int32) ServerOption {
    176 	return func(o *options) {
    177 		o.initialConnWindowSize = s
    178 	}
    179 }
    180 
    181 // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
    182 func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
    183 	return func(o *options) {
    184 		o.keepaliveParams = kp
    185 	}
    186 }
    187 
    188 // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
    189 func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
    190 	return func(o *options) {
    191 		o.keepalivePolicy = kep
    192 	}
    193 }
    194 
    195 // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
    196 //
    197 // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
    198 func CustomCodec(codec Codec) ServerOption {
    199 	return func(o *options) {
    200 		o.codec = codec
    201 	}
    202 }
    203 
    204 // RPCCompressor returns a ServerOption that sets a compressor for outbound
    205 // messages.  For backward compatibility, all outbound messages will be sent
    206 // using this compressor, regardless of incoming message compression.  By
    207 // default, server messages will be sent using the same compressor with which
    208 // request messages were sent.
    209 //
    210 // Deprecated: use encoding.RegisterCompressor instead.
    211 func RPCCompressor(cp Compressor) ServerOption {
    212 	return func(o *options) {
    213 		o.cp = cp
    214 	}
    215 }
    216 
    217 // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
    218 // messages.  It has higher priority than decompressors registered via
    219 // encoding.RegisterCompressor.
    220 //
    221 // Deprecated: use encoding.RegisterCompressor instead.
    222 func RPCDecompressor(dc Decompressor) ServerOption {
    223 	return func(o *options) {
    224 		o.dc = dc
    225 	}
    226 }
    227 
    228 // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
    229 // If this is not set, gRPC uses the default limit.
    230 //
    231 // Deprecated: use MaxRecvMsgSize instead.
    232 func MaxMsgSize(m int) ServerOption {
    233 	return MaxRecvMsgSize(m)
    234 }
    235 
    236 // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
    237 // If this is not set, gRPC uses the default 4MB.
    238 func MaxRecvMsgSize(m int) ServerOption {
    239 	return func(o *options) {
    240 		o.maxReceiveMessageSize = m
    241 	}
    242 }
    243 
    244 // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
    245 // If this is not set, gRPC uses the default 4MB.
    246 func MaxSendMsgSize(m int) ServerOption {
    247 	return func(o *options) {
    248 		o.maxSendMessageSize = m
    249 	}
    250 }
    251 
    252 // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
    253 // of concurrent streams to each ServerTransport.
    254 func MaxConcurrentStreams(n uint32) ServerOption {
    255 	return func(o *options) {
    256 		o.maxConcurrentStreams = n
    257 	}
    258 }
    259 
    260 // Creds returns a ServerOption that sets credentials for server connections.
    261 func Creds(c credentials.TransportCredentials) ServerOption {
    262 	return func(o *options) {
    263 		o.creds = c
    264 	}
    265 }
    266 
    267 // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
    268 // server. Only one unary interceptor can be installed. The construction of multiple
    269 // interceptors (e.g., chaining) can be implemented at the caller.
    270 func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
    271 	return func(o *options) {
    272 		if o.unaryInt != nil {
    273 			panic("The unary server interceptor was already set and may not be reset.")
    274 		}
    275 		o.unaryInt = i
    276 	}
    277 }
    278 
    279 // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
    280 // server. Only one stream interceptor can be installed.
    281 func StreamInterceptor(i StreamServerInterceptor) ServerOption {
    282 	return func(o *options) {
    283 		if o.streamInt != nil {
    284 			panic("The stream server interceptor was already set and may not be reset.")
    285 		}
    286 		o.streamInt = i
    287 	}
    288 }
    289 
    290 // InTapHandle returns a ServerOption that sets the tap handle for all the server
    291 // transport to be created. Only one can be installed.
    292 func InTapHandle(h tap.ServerInHandle) ServerOption {
    293 	return func(o *options) {
    294 		if o.inTapHandle != nil {
    295 			panic("The tap handle was already set and may not be reset.")
    296 		}
    297 		o.inTapHandle = h
    298 	}
    299 }
    300 
    301 // StatsHandler returns a ServerOption that sets the stats handler for the server.
    302 func StatsHandler(h stats.Handler) ServerOption {
    303 	return func(o *options) {
    304 		o.statsHandler = h
    305 	}
    306 }
    307 
    308 // UnknownServiceHandler returns a ServerOption that allows for adding a custom
    309 // unknown service handler. The provided method is a bidi-streaming RPC service
    310 // handler that will be invoked instead of returning the "unimplemented" gRPC
    311 // error whenever a request is received for an unregistered service or method.
    312 // The handling function has full access to the Context of the request and the
    313 // stream, and the invocation bypasses interceptors.
    314 func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
    315 	return func(o *options) {
    316 		o.unknownStreamDesc = &StreamDesc{
    317 			StreamName: "unknown_service_handler",
    318 			Handler:    streamHandler,
    319 			// We need to assume that the users of the streamHandler will want to use both.
    320 			ClientStreams: true,
    321 			ServerStreams: true,
    322 		}
    323 	}
    324 }
    325 
    326 // ConnectionTimeout returns a ServerOption that sets the timeout for
    327 // connection establishment (up to and including HTTP/2 handshaking) for all
    328 // new connections.  If this is not set, the default is 120 seconds.  A zero or
    329 // negative value will result in an immediate timeout.
    330 //
    331 // This API is EXPERIMENTAL.
    332 func ConnectionTimeout(d time.Duration) ServerOption {
    333 	return func(o *options) {
    334 		o.connectionTimeout = d
    335 	}
    336 }
    337 
    338 // NewServer creates a gRPC server which has no service registered and has not
    339 // started to accept requests yet.
    340 func NewServer(opt ...ServerOption) *Server {
    341 	opts := defaultServerOptions
    342 	for _, o := range opt {
    343 		o(&opts)
    344 	}
    345 	s := &Server{
    346 		lis:   make(map[net.Listener]bool),
    347 		opts:  opts,
    348 		conns: make(map[io.Closer]bool),
    349 		m:     make(map[string]*service),
    350 		quit:  make(chan struct{}),
    351 		done:  make(chan struct{}),
    352 	}
    353 	s.cv = sync.NewCond(&s.mu)
    354 	if EnableTracing {
    355 		_, file, line, _ := runtime.Caller(1)
    356 		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
    357 	}
    358 
    359 	if channelz.IsOn() {
    360 		s.channelzID = channelz.RegisterServer(s, "")
    361 	}
    362 	return s
    363 }
    364 
    365 // printf records an event in s's event log, unless s has been stopped.
    366 // REQUIRES s.mu is held.
    367 func (s *Server) printf(format string, a ...interface{}) {
    368 	if s.events != nil {
    369 		s.events.Printf(format, a...)
    370 	}
    371 }
    372 
    373 // errorf records an error in s's event log, unless s has been stopped.
    374 // REQUIRES s.mu is held.
    375 func (s *Server) errorf(format string, a ...interface{}) {
    376 	if s.events != nil {
    377 		s.events.Errorf(format, a...)
    378 	}
    379 }
    380 
    381 // RegisterService registers a service and its implementation to the gRPC
    382 // server. It is called from the IDL generated code. This must be called before
    383 // invoking Serve.
    384 func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
    385 	ht := reflect.TypeOf(sd.HandlerType).Elem()
    386 	st := reflect.TypeOf(ss)
    387 	if !st.Implements(ht) {
    388 		grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
    389 	}
    390 	s.register(sd, ss)
    391 }
    392 
    393 func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    394 	s.mu.Lock()
    395 	defer s.mu.Unlock()
    396 	s.printf("RegisterService(%q)", sd.ServiceName)
    397 	if s.serve {
    398 		grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
    399 	}
    400 	if _, ok := s.m[sd.ServiceName]; ok {
    401 		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
    402 	}
    403 	srv := &service{
    404 		server: ss,
    405 		md:     make(map[string]*MethodDesc),
    406 		sd:     make(map[string]*StreamDesc),
    407 		mdata:  sd.Metadata,
    408 	}
    409 	for i := range sd.Methods {
    410 		d := &sd.Methods[i]
    411 		srv.md[d.MethodName] = d
    412 	}
    413 	for i := range sd.Streams {
    414 		d := &sd.Streams[i]
    415 		srv.sd[d.StreamName] = d
    416 	}
    417 	s.m[sd.ServiceName] = srv
    418 }
    419 
    420 // MethodInfo contains the information of an RPC including its method name and type.
    421 type MethodInfo struct {
    422 	// Name is the method name only, without the service name or package name.
    423 	Name string
    424 	// IsClientStream indicates whether the RPC is a client streaming RPC.
    425 	IsClientStream bool
    426 	// IsServerStream indicates whether the RPC is a server streaming RPC.
    427 	IsServerStream bool
    428 }
    429 
    430 // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
    431 type ServiceInfo struct {
    432 	Methods []MethodInfo
    433 	// Metadata is the metadata specified in ServiceDesc when registering service.
    434 	Metadata interface{}
    435 }
    436 
    437 // GetServiceInfo returns a map from service names to ServiceInfo.
    438 // Service names include the package names, in the form of <package>.<service>.
    439 func (s *Server) GetServiceInfo() map[string]ServiceInfo {
    440 	ret := make(map[string]ServiceInfo)
    441 	for n, srv := range s.m {
    442 		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
    443 		for m := range srv.md {
    444 			methods = append(methods, MethodInfo{
    445 				Name:           m,
    446 				IsClientStream: false,
    447 				IsServerStream: false,
    448 			})
    449 		}
    450 		for m, d := range srv.sd {
    451 			methods = append(methods, MethodInfo{
    452 				Name:           m,
    453 				IsClientStream: d.ClientStreams,
    454 				IsServerStream: d.ServerStreams,
    455 			})
    456 		}
    457 
    458 		ret[n] = ServiceInfo{
    459 			Methods:  methods,
    460 			Metadata: srv.mdata,
    461 		}
    462 	}
    463 	return ret
    464 }
    465 
    466 // ErrServerStopped indicates that the operation is now illegal because of
    467 // the server being stopped.
    468 var ErrServerStopped = errors.New("grpc: the server has been stopped")
    469 
    470 func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
    471 	if s.opts.creds == nil {
    472 		return rawConn, nil, nil
    473 	}
    474 	return s.opts.creds.ServerHandshake(rawConn)
    475 }
    476 
    477 type listenSocket struct {
    478 	net.Listener
    479 	channelzID int64
    480 }
    481 
    482 func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
    483 	return &channelz.SocketInternalMetric{
    484 		SocketOptions: channelz.GetSocketOption(l.Listener),
    485 		LocalAddr:     l.Listener.Addr(),
    486 	}
    487 }
    488 
    489 func (l *listenSocket) Close() error {
    490 	err := l.Listener.Close()
    491 	if channelz.IsOn() {
    492 		channelz.RemoveEntry(l.channelzID)
    493 	}
    494 	return err
    495 }
    496 
    497 // Serve accepts incoming connections on the listener lis, creating a new
    498 // ServerTransport and service goroutine for each. The service goroutines
    499 // read gRPC requests and then call the registered handlers to reply to them.
    500 // Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
    501 // this method returns.
    502 // Serve will return a non-nil error unless Stop or GracefulStop is called.
    503 func (s *Server) Serve(lis net.Listener) error {
    504 	s.mu.Lock()
    505 	s.printf("serving")
    506 	s.serve = true
    507 	if s.lis == nil {
    508 		// Serve called after Stop or GracefulStop.
    509 		s.mu.Unlock()
    510 		lis.Close()
    511 		return ErrServerStopped
    512 	}
    513 
    514 	s.serveWG.Add(1)
    515 	defer func() {
    516 		s.serveWG.Done()
    517 		select {
    518 		// Stop or GracefulStop called; block until done and return nil.
    519 		case <-s.quit:
    520 			<-s.done
    521 		default:
    522 		}
    523 	}()
    524 
    525 	ls := &listenSocket{Listener: lis}
    526 	s.lis[ls] = true
    527 
    528 	if channelz.IsOn() {
    529 		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, "")
    530 	}
    531 	s.mu.Unlock()
    532 
    533 	defer func() {
    534 		s.mu.Lock()
    535 		if s.lis != nil && s.lis[ls] {
    536 			ls.Close()
    537 			delete(s.lis, ls)
    538 		}
    539 		s.mu.Unlock()
    540 	}()
    541 
    542 	var tempDelay time.Duration // how long to sleep on accept failure
    543 
    544 	for {
    545 		rawConn, err := lis.Accept()
    546 		if err != nil {
    547 			if ne, ok := err.(interface {
    548 				Temporary() bool
    549 			}); ok && ne.Temporary() {
    550 				if tempDelay == 0 {
    551 					tempDelay = 5 * time.Millisecond
    552 				} else {
    553 					tempDelay *= 2
    554 				}
    555 				if max := 1 * time.Second; tempDelay > max {
    556 					tempDelay = max
    557 				}
    558 				s.mu.Lock()
    559 				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
    560 				s.mu.Unlock()
    561 				timer := time.NewTimer(tempDelay)
    562 				select {
    563 				case <-timer.C:
    564 				case <-s.quit:
    565 					timer.Stop()
    566 					return nil
    567 				}
    568 				continue
    569 			}
    570 			s.mu.Lock()
    571 			s.printf("done serving; Accept = %v", err)
    572 			s.mu.Unlock()
    573 
    574 			select {
    575 			case <-s.quit:
    576 				return nil
    577 			default:
    578 			}
    579 			return err
    580 		}
    581 		tempDelay = 0
    582 		// Start a new goroutine to deal with rawConn so we don't stall this Accept
    583 		// loop goroutine.
    584 		//
    585 		// Make sure we account for the goroutine so GracefulStop doesn't nil out
    586 		// s.conns before this conn can be added.
    587 		s.serveWG.Add(1)
    588 		go func() {
    589 			s.handleRawConn(rawConn)
    590 			s.serveWG.Done()
    591 		}()
    592 	}
    593 }
    594 
    595 // handleRawConn forks a goroutine to handle a just-accepted connection that
    596 // has not had any I/O performed on it yet.
    597 func (s *Server) handleRawConn(rawConn net.Conn) {
    598 	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
    599 	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
    600 	if err != nil {
    601 		s.mu.Lock()
    602 		s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
    603 		s.mu.Unlock()
    604 		grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
    605 		// If serverHandshake returns ErrConnDispatched, keep rawConn open.
    606 		if err != credentials.ErrConnDispatched {
    607 			rawConn.Close()
    608 		}
    609 		rawConn.SetDeadline(time.Time{})
    610 		return
    611 	}
    612 
    613 	s.mu.Lock()
    614 	if s.conns == nil {
    615 		s.mu.Unlock()
    616 		conn.Close()
    617 		return
    618 	}
    619 	s.mu.Unlock()
    620 
    621 	var serve func()
    622 	c := conn.(io.Closer)
    623 	if s.opts.useHandlerImpl {
    624 		serve = func() { s.serveUsingHandler(conn) }
    625 	} else {
    626 		// Finish handshaking (HTTP2)
    627 		st := s.newHTTP2Transport(conn, authInfo)
    628 		if st == nil {
    629 			return
    630 		}
    631 		c = st
    632 		serve = func() { s.serveStreams(st) }
    633 	}
    634 
    635 	rawConn.SetDeadline(time.Time{})
    636 	if !s.addConn(c) {
    637 		return
    638 	}
    639 	go func() {
    640 		serve()
    641 		s.removeConn(c)
    642 	}()
    643 }
    644 
    645 // newHTTP2Transport sets up a http/2 transport (using the
    646 // gRPC http2 server transport in transport/http2_server.go).
    647 func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
    648 	config := &transport.ServerConfig{
    649 		MaxStreams:            s.opts.maxConcurrentStreams,
    650 		AuthInfo:              authInfo,
    651 		InTapHandle:           s.opts.inTapHandle,
    652 		StatsHandler:          s.opts.statsHandler,
    653 		KeepaliveParams:       s.opts.keepaliveParams,
    654 		KeepalivePolicy:       s.opts.keepalivePolicy,
    655 		InitialWindowSize:     s.opts.initialWindowSize,
    656 		InitialConnWindowSize: s.opts.initialConnWindowSize,
    657 		WriteBufferSize:       s.opts.writeBufferSize,
    658 		ReadBufferSize:        s.opts.readBufferSize,
    659 		ChannelzParentID:      s.channelzID,
    660 	}
    661 	st, err := transport.NewServerTransport("http2", c, config)
    662 	if err != nil {
    663 		s.mu.Lock()
    664 		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
    665 		s.mu.Unlock()
    666 		c.Close()
    667 		grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
    668 		return nil
    669 	}
    670 
    671 	return st
    672 }
    673 
    674 func (s *Server) serveStreams(st transport.ServerTransport) {
    675 	defer st.Close()
    676 	var wg sync.WaitGroup
    677 	st.HandleStreams(func(stream *transport.Stream) {
    678 		wg.Add(1)
    679 		go func() {
    680 			defer wg.Done()
    681 			s.handleStream(st, stream, s.traceInfo(st, stream))
    682 		}()
    683 	}, func(ctx context.Context, method string) context.Context {
    684 		if !EnableTracing {
    685 			return ctx
    686 		}
    687 		tr := trace.New("grpc.Recv."+methodFamily(method), method)
    688 		return trace.NewContext(ctx, tr)
    689 	})
    690 	wg.Wait()
    691 }
    692 
    693 var _ http.Handler = (*Server)(nil)
    694 
    695 // serveUsingHandler is called from handleRawConn when s is configured
    696 // to handle requests via the http.Handler interface. It sets up a
    697 // net/http.Server to handle the just-accepted conn. The http.Server
    698 // is configured to route all incoming requests (all HTTP/2 streams)
    699 // to ServeHTTP, which creates a new ServerTransport for each stream.
    700 // serveUsingHandler blocks until conn closes.
    701 //
    702 // This codepath is only used when Server.TestingUseHandlerImpl has
    703 // been configured. This lets the end2end tests exercise the ServeHTTP
    704 // method as one of the environment types.
    705 //
    706 // conn is the *tls.Conn that's already been authenticated.
    707 func (s *Server) serveUsingHandler(conn net.Conn) {
    708 	h2s := &http2.Server{
    709 		MaxConcurrentStreams: s.opts.maxConcurrentStreams,
    710 	}
    711 	h2s.ServeConn(conn, &http2.ServeConnOpts{
    712 		Handler: s,
    713 	})
    714 }
    715 
    716 // ServeHTTP implements the Go standard library's http.Handler
    717 // interface by responding to the gRPC request r, by looking up
    718 // the requested gRPC method in the gRPC server s.
    719 //
    720 // The provided HTTP request must have arrived on an HTTP/2
    721 // connection. When using the Go standard library's server,
    722 // practically this means that the Request must also have arrived
    723 // over TLS.
    724 //
    725 // To share one port (such as 443 for https) between gRPC and an
    726 // existing http.Handler, use a root http.Handler such as:
    727 //
    728 //   if r.ProtoMajor == 2 && strings.HasPrefix(
    729 //   	r.Header.Get("Content-Type"), "application/grpc") {
    730 //   	grpcServer.ServeHTTP(w, r)
    731 //   } else {
    732 //   	yourMux.ServeHTTP(w, r)
    733 //   }
    734 //
    735 // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
    736 // separate from grpc-go's HTTP/2 server. Performance and features may vary
    737 // between the two paths. ServeHTTP does not support some gRPC features
    738 // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
    739 // and subject to change.
    740 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    741 	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
    742 	if err != nil {
    743 		http.Error(w, err.Error(), http.StatusInternalServerError)
    744 		return
    745 	}
    746 	if !s.addConn(st) {
    747 		return
    748 	}
    749 	defer s.removeConn(st)
    750 	s.serveStreams(st)
    751 }
    752 
    753 // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
    754 // If tracing is not enabled, it returns nil.
    755 func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
    756 	tr, ok := trace.FromContext(stream.Context())
    757 	if !ok {
    758 		return nil
    759 	}
    760 
    761 	trInfo = &traceInfo{
    762 		tr: tr,
    763 	}
    764 	trInfo.firstLine.client = false
    765 	trInfo.firstLine.remoteAddr = st.RemoteAddr()
    766 
    767 	if dl, ok := stream.Context().Deadline(); ok {
    768 		trInfo.firstLine.deadline = dl.Sub(time.Now())
    769 	}
    770 	return trInfo
    771 }
    772 
    773 func (s *Server) addConn(c io.Closer) bool {
    774 	s.mu.Lock()
    775 	defer s.mu.Unlock()
    776 	if s.conns == nil {
    777 		c.Close()
    778 		return false
    779 	}
    780 	if s.drain {
    781 		// Transport added after we drained our existing conns: drain it
    782 		// immediately.
    783 		c.(transport.ServerTransport).Drain()
    784 	}
    785 	s.conns[c] = true
    786 	return true
    787 }
    788 
    789 func (s *Server) removeConn(c io.Closer) {
    790 	s.mu.Lock()
    791 	defer s.mu.Unlock()
    792 	if s.conns != nil {
    793 		delete(s.conns, c)
    794 		s.cv.Broadcast()
    795 	}
    796 }
    797 
    798 // ChannelzMetric returns ServerInternalMetric of current server.
    799 // This is an EXPERIMENTAL API.
    800 func (s *Server) ChannelzMetric() *channelz.ServerInternalMetric {
    801 	s.czmu.RLock()
    802 	defer s.czmu.RUnlock()
    803 	return &channelz.ServerInternalMetric{
    804 		CallsStarted:             s.callsStarted,
    805 		CallsSucceeded:           s.callsSucceeded,
    806 		CallsFailed:              s.callsFailed,
    807 		LastCallStartedTimestamp: s.lastCallStartedTime,
    808 	}
    809 }
    810 
    811 func (s *Server) incrCallsStarted() {
    812 	s.czmu.Lock()
    813 	s.callsStarted++
    814 	s.lastCallStartedTime = time.Now()
    815 	s.czmu.Unlock()
    816 }
    817 
    818 func (s *Server) incrCallsSucceeded() {
    819 	s.czmu.Lock()
    820 	s.callsSucceeded++
    821 	s.czmu.Unlock()
    822 }
    823 
    824 func (s *Server) incrCallsFailed() {
    825 	s.czmu.Lock()
    826 	s.callsFailed++
    827 	s.czmu.Unlock()
    828 }
    829 
    830 func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
    831 	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
    832 	if err != nil {
    833 		grpclog.Errorln("grpc: server failed to encode response: ", err)
    834 		return err
    835 	}
    836 	compData, err := compress(data, cp, comp)
    837 	if err != nil {
    838 		grpclog.Errorln("grpc: server failed to compress response: ", err)
    839 		return err
    840 	}
    841 	hdr, payload := msgHeader(data, compData)
    842 	// TODO(dfawley): should we be checking len(data) instead?
    843 	if len(payload) > s.opts.maxSendMessageSize {
    844 		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
    845 	}
    846 	err = t.Write(stream, hdr, payload, opts)
    847 	if err == nil && s.opts.statsHandler != nil {
    848 		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
    849 	}
    850 	return err
    851 }
    852 
    853 func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
    854 	if channelz.IsOn() {
    855 		s.incrCallsStarted()
    856 		defer func() {
    857 			if err != nil && err != io.EOF {
    858 				s.incrCallsFailed()
    859 			} else {
    860 				s.incrCallsSucceeded()
    861 			}
    862 		}()
    863 	}
    864 	sh := s.opts.statsHandler
    865 	if sh != nil {
    866 		beginTime := time.Now()
    867 		begin := &stats.Begin{
    868 			BeginTime: beginTime,
    869 		}
    870 		sh.HandleRPC(stream.Context(), begin)
    871 		defer func() {
    872 			end := &stats.End{
    873 				BeginTime: beginTime,
    874 				EndTime:   time.Now(),
    875 			}
    876 			if err != nil && err != io.EOF {
    877 				end.Error = toRPCErr(err)
    878 			}
    879 			sh.HandleRPC(stream.Context(), end)
    880 		}()
    881 	}
    882 	if trInfo != nil {
    883 		defer trInfo.tr.Finish()
    884 		trInfo.firstLine.client = false
    885 		trInfo.tr.LazyLog(&trInfo.firstLine, false)
    886 		defer func() {
    887 			if err != nil && err != io.EOF {
    888 				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
    889 				trInfo.tr.SetError()
    890 			}
    891 		}()
    892 	}
    893 
    894 	// comp and cp are used for compression.  decomp and dc are used for
    895 	// decompression.  If comp and decomp are both set, they are the same;
    896 	// however they are kept separate to ensure that at most one of the
    897 	// compressor/decompressor variable pairs are set for use later.
    898 	var comp, decomp encoding.Compressor
    899 	var cp Compressor
    900 	var dc Decompressor
    901 
    902 	// If dc is set and matches the stream's compression, use it.  Otherwise, try
    903 	// to find a matching registered compressor for decomp.
    904 	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
    905 		dc = s.opts.dc
    906 	} else if rc != "" && rc != encoding.Identity {
    907 		decomp = encoding.GetCompressor(rc)
    908 		if decomp == nil {
    909 			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
    910 			t.WriteStatus(stream, st)
    911 			return st.Err()
    912 		}
    913 	}
    914 
    915 	// If cp is set, use it.  Otherwise, attempt to compress the response using
    916 	// the incoming message compression method.
    917 	//
    918 	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
    919 	if s.opts.cp != nil {
    920 		cp = s.opts.cp
    921 		stream.SetSendCompress(cp.Type())
    922 	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
    923 		// Legacy compressor not specified; attempt to respond with same encoding.
    924 		comp = encoding.GetCompressor(rc)
    925 		if comp != nil {
    926 			stream.SetSendCompress(rc)
    927 		}
    928 	}
    929 
    930 	p := &parser{r: stream}
    931 	pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
    932 	if err == io.EOF {
    933 		// The entire stream is done (for unary RPC only).
    934 		return err
    935 	}
    936 	if err == io.ErrUnexpectedEOF {
    937 		err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
    938 	}
    939 	if err != nil {
    940 		if st, ok := status.FromError(err); ok {
    941 			if e := t.WriteStatus(stream, st); e != nil {
    942 				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
    943 			}
    944 		} else {
    945 			switch st := err.(type) {
    946 			case transport.ConnectionError:
    947 				// Nothing to do here.
    948 			case transport.StreamError:
    949 				if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
    950 					grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
    951 				}
    952 			default:
    953 				panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
    954 			}
    955 		}
    956 		return err
    957 	}
    958 	if channelz.IsOn() {
    959 		t.IncrMsgRecv()
    960 	}
    961 	if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
    962 		if e := t.WriteStatus(stream, st); e != nil {
    963 			grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
    964 		}
    965 		return st.Err()
    966 	}
    967 	var inPayload *stats.InPayload
    968 	if sh != nil {
    969 		inPayload = &stats.InPayload{
    970 			RecvTime: time.Now(),
    971 		}
    972 	}
    973 	df := func(v interface{}) error {
    974 		if inPayload != nil {
    975 			inPayload.WireLength = len(req)
    976 		}
    977 		if pf == compressionMade {
    978 			var err error
    979 			if dc != nil {
    980 				req, err = dc.Do(bytes.NewReader(req))
    981 				if err != nil {
    982 					return status.Errorf(codes.Internal, err.Error())
    983 				}
    984 			} else {
    985 				tmp, _ := decomp.Decompress(bytes.NewReader(req))
    986 				req, err = ioutil.ReadAll(tmp)
    987 				if err != nil {
    988 					return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
    989 				}
    990 			}
    991 		}
    992 		if len(req) > s.opts.maxReceiveMessageSize {
    993 			// TODO: Revisit the error code. Currently keep it consistent with
    994 			// java implementation.
    995 			return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
    996 		}
    997 		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil {
    998 			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
    999 		}
   1000 		if inPayload != nil {
   1001 			inPayload.Payload = v
   1002 			inPayload.Data = req
   1003 			inPayload.Length = len(req)
   1004 			sh.HandleRPC(stream.Context(), inPayload)
   1005 		}
   1006 		if trInfo != nil {
   1007 			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
   1008 		}
   1009 		return nil
   1010 	}
   1011 	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
   1012 	reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
   1013 	if appErr != nil {
   1014 		appStatus, ok := status.FromError(appErr)
   1015 		if !ok {
   1016 			// Convert appErr if it is not a grpc status error.
   1017 			appErr = status.Error(codes.Unknown, appErr.Error())
   1018 			appStatus, _ = status.FromError(appErr)
   1019 		}
   1020 		if trInfo != nil {
   1021 			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
   1022 			trInfo.tr.SetError()
   1023 		}
   1024 		if e := t.WriteStatus(stream, appStatus); e != nil {
   1025 			grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
   1026 		}
   1027 		return appErr
   1028 	}
   1029 	if trInfo != nil {
   1030 		trInfo.tr.LazyLog(stringer("OK"), false)
   1031 	}
   1032 	opts := &transport.Options{
   1033 		Last:  true,
   1034 		Delay: false,
   1035 	}
   1036 
   1037 	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
   1038 		if err == io.EOF {
   1039 			// The entire stream is done (for unary RPC only).
   1040 			return err
   1041 		}
   1042 		if s, ok := status.FromError(err); ok {
   1043 			if e := t.WriteStatus(stream, s); e != nil {
   1044 				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
   1045 			}
   1046 		} else {
   1047 			switch st := err.(type) {
   1048 			case transport.ConnectionError:
   1049 				// Nothing to do here.
   1050 			case transport.StreamError:
   1051 				if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
   1052 					grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
   1053 				}
   1054 			default:
   1055 				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
   1056 			}
   1057 		}
   1058 		return err
   1059 	}
   1060 	if channelz.IsOn() {
   1061 		t.IncrMsgSent()
   1062 	}
   1063 	if trInfo != nil {
   1064 		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
   1065 	}
   1066 	// TODO: Should we be logging if writing status failed here, like above?
   1067 	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
   1068 	// error or allow the stats handler to see it?
   1069 	return t.WriteStatus(stream, status.New(codes.OK, ""))
   1070 }
   1071 
   1072 func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
   1073 	if channelz.IsOn() {
   1074 		s.incrCallsStarted()
   1075 		defer func() {
   1076 			if err != nil && err != io.EOF {
   1077 				s.incrCallsFailed()
   1078 			} else {
   1079 				s.incrCallsSucceeded()
   1080 			}
   1081 		}()
   1082 	}
   1083 	sh := s.opts.statsHandler
   1084 	if sh != nil {
   1085 		beginTime := time.Now()
   1086 		begin := &stats.Begin{
   1087 			BeginTime: beginTime,
   1088 		}
   1089 		sh.HandleRPC(stream.Context(), begin)
   1090 		defer func() {
   1091 			end := &stats.End{
   1092 				BeginTime: beginTime,
   1093 				EndTime:   time.Now(),
   1094 			}
   1095 			if err != nil && err != io.EOF {
   1096 				end.Error = toRPCErr(err)
   1097 			}
   1098 			sh.HandleRPC(stream.Context(), end)
   1099 		}()
   1100 	}
   1101 	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
   1102 	ss := &serverStream{
   1103 		ctx:   ctx,
   1104 		t:     t,
   1105 		s:     stream,
   1106 		p:     &parser{r: stream},
   1107 		codec: s.getCodec(stream.ContentSubtype()),
   1108 		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
   1109 		maxSendMessageSize:    s.opts.maxSendMessageSize,
   1110 		trInfo:                trInfo,
   1111 		statsHandler:          sh,
   1112 	}
   1113 
   1114 	// If dc is set and matches the stream's compression, use it.  Otherwise, try
   1115 	// to find a matching registered compressor for decomp.
   1116 	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
   1117 		ss.dc = s.opts.dc
   1118 	} else if rc != "" && rc != encoding.Identity {
   1119 		ss.decomp = encoding.GetCompressor(rc)
   1120 		if ss.decomp == nil {
   1121 			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
   1122 			t.WriteStatus(ss.s, st)
   1123 			return st.Err()
   1124 		}
   1125 	}
   1126 
   1127 	// If cp is set, use it.  Otherwise, attempt to compress the response using
   1128 	// the incoming message compression method.
   1129 	//
   1130 	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
   1131 	if s.opts.cp != nil {
   1132 		ss.cp = s.opts.cp
   1133 		stream.SetSendCompress(s.opts.cp.Type())
   1134 	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
   1135 		// Legacy compressor not specified; attempt to respond with same encoding.
   1136 		ss.comp = encoding.GetCompressor(rc)
   1137 		if ss.comp != nil {
   1138 			stream.SetSendCompress(rc)
   1139 		}
   1140 	}
   1141 
   1142 	if trInfo != nil {
   1143 		trInfo.tr.LazyLog(&trInfo.firstLine, false)
   1144 		defer func() {
   1145 			ss.mu.Lock()
   1146 			if err != nil && err != io.EOF {
   1147 				ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
   1148 				ss.trInfo.tr.SetError()
   1149 			}
   1150 			ss.trInfo.tr.Finish()
   1151 			ss.trInfo.tr = nil
   1152 			ss.mu.Unlock()
   1153 		}()
   1154 	}
   1155 	var appErr error
   1156 	var server interface{}
   1157 	if srv != nil {
   1158 		server = srv.server
   1159 	}
   1160 	if s.opts.streamInt == nil {
   1161 		appErr = sd.Handler(server, ss)
   1162 	} else {
   1163 		info := &StreamServerInfo{
   1164 			FullMethod:     stream.Method(),
   1165 			IsClientStream: sd.ClientStreams,
   1166 			IsServerStream: sd.ServerStreams,
   1167 		}
   1168 		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
   1169 	}
   1170 	if appErr != nil {
   1171 		appStatus, ok := status.FromError(appErr)
   1172 		if !ok {
   1173 			switch err := appErr.(type) {
   1174 			case transport.StreamError:
   1175 				appStatus = status.New(err.Code, err.Desc)
   1176 			default:
   1177 				appStatus = status.New(codes.Unknown, appErr.Error())
   1178 			}
   1179 			appErr = appStatus.Err()
   1180 		}
   1181 		if trInfo != nil {
   1182 			ss.mu.Lock()
   1183 			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
   1184 			ss.trInfo.tr.SetError()
   1185 			ss.mu.Unlock()
   1186 		}
   1187 		t.WriteStatus(ss.s, appStatus)
   1188 		// TODO: Should we log an error from WriteStatus here and below?
   1189 		return appErr
   1190 	}
   1191 	if trInfo != nil {
   1192 		ss.mu.Lock()
   1193 		ss.trInfo.tr.LazyLog(stringer("OK"), false)
   1194 		ss.mu.Unlock()
   1195 	}
   1196 	return t.WriteStatus(ss.s, status.New(codes.OK, ""))
   1197 }
   1198 
   1199 func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
   1200 	sm := stream.Method()
   1201 	if sm != "" && sm[0] == '/' {
   1202 		sm = sm[1:]
   1203 	}
   1204 	pos := strings.LastIndex(sm, "/")
   1205 	if pos == -1 {
   1206 		if trInfo != nil {
   1207 			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
   1208 			trInfo.tr.SetError()
   1209 		}
   1210 		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
   1211 		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
   1212 			if trInfo != nil {
   1213 				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
   1214 				trInfo.tr.SetError()
   1215 			}
   1216 			grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
   1217 		}
   1218 		if trInfo != nil {
   1219 			trInfo.tr.Finish()
   1220 		}
   1221 		return
   1222 	}
   1223 	service := sm[:pos]
   1224 	method := sm[pos+1:]
   1225 	srv, ok := s.m[service]
   1226 	if !ok {
   1227 		if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
   1228 			s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
   1229 			return
   1230 		}
   1231 		if trInfo != nil {
   1232 			trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
   1233 			trInfo.tr.SetError()
   1234 		}
   1235 		errDesc := fmt.Sprintf("unknown service %v", service)
   1236 		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
   1237 			if trInfo != nil {
   1238 				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
   1239 				trInfo.tr.SetError()
   1240 			}
   1241 			grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
   1242 		}
   1243 		if trInfo != nil {
   1244 			trInfo.tr.Finish()
   1245 		}
   1246 		return
   1247 	}
   1248 	// Unary RPC or Streaming RPC?
   1249 	if md, ok := srv.md[method]; ok {
   1250 		s.processUnaryRPC(t, stream, srv, md, trInfo)
   1251 		return
   1252 	}
   1253 	if sd, ok := srv.sd[method]; ok {
   1254 		s.processStreamingRPC(t, stream, srv, sd, trInfo)
   1255 		return
   1256 	}
   1257 	if trInfo != nil {
   1258 		trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
   1259 		trInfo.tr.SetError()
   1260 	}
   1261 	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
   1262 		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
   1263 		return
   1264 	}
   1265 	errDesc := fmt.Sprintf("unknown method %v", method)
   1266 	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
   1267 		if trInfo != nil {
   1268 			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
   1269 			trInfo.tr.SetError()
   1270 		}
   1271 		grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
   1272 	}
   1273 	if trInfo != nil {
   1274 		trInfo.tr.Finish()
   1275 	}
   1276 }
   1277 
   1278 // The key to save ServerTransportStream in the context.
   1279 type streamKey struct{}
   1280 
   1281 // NewContextWithServerTransportStream creates a new context from ctx and
   1282 // attaches stream to it.
   1283 //
   1284 // This API is EXPERIMENTAL.
   1285 func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
   1286 	return context.WithValue(ctx, streamKey{}, stream)
   1287 }
   1288 
   1289 // ServerTransportStream is a minimal interface that a transport stream must
   1290 // implement. This can be used to mock an actual transport stream for tests of
   1291 // handler code that use, for example, grpc.SetHeader (which requires some
   1292 // stream to be in context).
   1293 //
   1294 // See also NewContextWithServerTransportStream.
   1295 //
   1296 // This API is EXPERIMENTAL.
   1297 type ServerTransportStream interface {
   1298 	Method() string
   1299 	SetHeader(md metadata.MD) error
   1300 	SendHeader(md metadata.MD) error
   1301 	SetTrailer(md metadata.MD) error
   1302 }
   1303 
   1304 // ServerTransportStreamFromContext returns the ServerTransportStream saved in
   1305 // ctx. Returns nil if the given context has no stream associated with it
   1306 // (which implies it is not an RPC invocation context).
   1307 //
   1308 // This API is EXPERIMENTAL.
   1309 func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
   1310 	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
   1311 	return s
   1312 }
   1313 
   1314 // Stop stops the gRPC server. It immediately closes all open
   1315 // connections and listeners.
   1316 // It cancels all active RPCs on the server side and the corresponding
   1317 // pending RPCs on the client side will get notified by connection
   1318 // errors.
   1319 func (s *Server) Stop() {
   1320 	s.quitOnce.Do(func() {
   1321 		close(s.quit)
   1322 	})
   1323 
   1324 	defer func() {
   1325 		s.serveWG.Wait()
   1326 		s.doneOnce.Do(func() {
   1327 			close(s.done)
   1328 		})
   1329 	}()
   1330 
   1331 	s.channelzRemoveOnce.Do(func() {
   1332 		if channelz.IsOn() {
   1333 			channelz.RemoveEntry(s.channelzID)
   1334 		}
   1335 	})
   1336 
   1337 	s.mu.Lock()
   1338 	listeners := s.lis
   1339 	s.lis = nil
   1340 	st := s.conns
   1341 	s.conns = nil
   1342 	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
   1343 	s.cv.Broadcast()
   1344 	s.mu.Unlock()
   1345 
   1346 	for lis := range listeners {
   1347 		lis.Close()
   1348 	}
   1349 	for c := range st {
   1350 		c.Close()
   1351 	}
   1352 
   1353 	s.mu.Lock()
   1354 	if s.events != nil {
   1355 		s.events.Finish()
   1356 		s.events = nil
   1357 	}
   1358 	s.mu.Unlock()
   1359 }
   1360 
   1361 // GracefulStop stops the gRPC server gracefully. It stops the server from
   1362 // accepting new connections and RPCs and blocks until all the pending RPCs are
   1363 // finished.
   1364 func (s *Server) GracefulStop() {
   1365 	s.quitOnce.Do(func() {
   1366 		close(s.quit)
   1367 	})
   1368 
   1369 	defer func() {
   1370 		s.doneOnce.Do(func() {
   1371 			close(s.done)
   1372 		})
   1373 	}()
   1374 
   1375 	s.channelzRemoveOnce.Do(func() {
   1376 		if channelz.IsOn() {
   1377 			channelz.RemoveEntry(s.channelzID)
   1378 		}
   1379 	})
   1380 	s.mu.Lock()
   1381 	if s.conns == nil {
   1382 		s.mu.Unlock()
   1383 		return
   1384 	}
   1385 
   1386 	for lis := range s.lis {
   1387 		lis.Close()
   1388 	}
   1389 	s.lis = nil
   1390 	if !s.drain {
   1391 		for c := range s.conns {
   1392 			c.(transport.ServerTransport).Drain()
   1393 		}
   1394 		s.drain = true
   1395 	}
   1396 
   1397 	// Wait for serving threads to be ready to exit.  Only then can we be sure no
   1398 	// new conns will be created.
   1399 	s.mu.Unlock()
   1400 	s.serveWG.Wait()
   1401 	s.mu.Lock()
   1402 
   1403 	for len(s.conns) != 0 {
   1404 		s.cv.Wait()
   1405 	}
   1406 	s.conns = nil
   1407 	if s.events != nil {
   1408 		s.events.Finish()
   1409 		s.events = nil
   1410 	}
   1411 	s.mu.Unlock()
   1412 }
   1413 
   1414 func init() {
   1415 	internal.TestingUseHandlerImpl = func(arg interface{}) {
   1416 		arg.(*Server).opts.useHandlerImpl = true
   1417 	}
   1418 }
   1419 
   1420 // contentSubtype must be lowercase
   1421 // cannot return nil
   1422 func (s *Server) getCodec(contentSubtype string) baseCodec {
   1423 	if s.opts.codec != nil {
   1424 		return s.opts.codec
   1425 	}
   1426 	if contentSubtype == "" {
   1427 		return encoding.GetCodec(proto.Name)
   1428 	}
   1429 	codec := encoding.GetCodec(contentSubtype)
   1430 	if codec == nil {
   1431 		return encoding.GetCodec(proto.Name)
   1432 	}
   1433 	return codec
   1434 }
   1435 
   1436 // SetHeader sets the header metadata.
   1437 // When called multiple times, all the provided metadata will be merged.
   1438 // All the metadata will be sent out when one of the following happens:
   1439 //  - grpc.SendHeader() is called;
   1440 //  - The first response is sent out;
   1441 //  - An RPC status is sent out (error or success).
   1442 func SetHeader(ctx context.Context, md metadata.MD) error {
   1443 	if md.Len() == 0 {
   1444 		return nil
   1445 	}
   1446 	stream := ServerTransportStreamFromContext(ctx)
   1447 	if stream == nil {
   1448 		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
   1449 	}
   1450 	return stream.SetHeader(md)
   1451 }
   1452 
   1453 // SendHeader sends header metadata. It may be called at most once.
   1454 // The provided md and headers set by SetHeader() will be sent.
   1455 func SendHeader(ctx context.Context, md metadata.MD) error {
   1456 	stream := ServerTransportStreamFromContext(ctx)
   1457 	if stream == nil {
   1458 		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
   1459 	}
   1460 	if err := stream.SendHeader(md); err != nil {
   1461 		return toRPCErr(err)
   1462 	}
   1463 	return nil
   1464 }
   1465 
   1466 // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
   1467 // When called more than once, all the provided metadata will be merged.
   1468 func SetTrailer(ctx context.Context, md metadata.MD) error {
   1469 	if md.Len() == 0 {
   1470 		return nil
   1471 	}
   1472 	stream := ServerTransportStreamFromContext(ctx)
   1473 	if stream == nil {
   1474 		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
   1475 	}
   1476 	return stream.SetTrailer(md)
   1477 }
   1478 
   1479 // Method returns the method string for the server context.  The returned
   1480 // string is in the format of "/service/method".
   1481 func Method(ctx context.Context) (string, bool) {
   1482 	s := ServerTransportStreamFromContext(ctx)
   1483 	if s == nil {
   1484 		return "", false
   1485 	}
   1486 	return s.Method(), true
   1487 }
   1488