1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 // This file is the implementation of a gRPC server using HTTP/2 which 20 // uses the standard Go http2 Server implementation (via the 21 // http.Handler interface), rather than speaking low-level HTTP/2 22 // frames itself. It is the implementation of *grpc.Server.ServeHTTP. 23 24 package transport 25 26 import ( 27 "errors" 28 "fmt" 29 "io" 30 "net" 31 "net/http" 32 "strings" 33 "sync" 34 "time" 35 36 "github.com/golang/protobuf/proto" 37 "golang.org/x/net/context" 38 "golang.org/x/net/http2" 39 "google.golang.org/grpc/codes" 40 "google.golang.org/grpc/credentials" 41 "google.golang.org/grpc/metadata" 42 "google.golang.org/grpc/peer" 43 "google.golang.org/grpc/stats" 44 "google.golang.org/grpc/status" 45 ) 46 47 // NewServerHandlerTransport returns a ServerTransport handling gRPC 48 // from inside an http.Handler. It requires that the http Server 49 // supports HTTP/2. 50 func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) { 51 if r.ProtoMajor != 2 { 52 return nil, errors.New("gRPC requires HTTP/2") 53 } 54 if r.Method != "POST" { 55 return nil, errors.New("invalid gRPC request method") 56 } 57 contentType := r.Header.Get("Content-Type") 58 // TODO: do we assume contentType is lowercase? we did before 59 contentSubtype, validContentType := contentSubtype(contentType) 60 if !validContentType { 61 return nil, errors.New("invalid gRPC request content-type") 62 } 63 if _, ok := w.(http.Flusher); !ok { 64 return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher") 65 } 66 if _, ok := w.(http.CloseNotifier); !ok { 67 return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier") 68 } 69 70 st := &serverHandlerTransport{ 71 rw: w, 72 req: r, 73 closedCh: make(chan struct{}), 74 writes: make(chan func()), 75 contentType: contentType, 76 contentSubtype: contentSubtype, 77 stats: stats, 78 } 79 80 if v := r.Header.Get("grpc-timeout"); v != "" { 81 to, err := decodeTimeout(v) 82 if err != nil { 83 return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err) 84 } 85 st.timeoutSet = true 86 st.timeout = to 87 } 88 89 metakv := []string{"content-type", contentType} 90 if r.Host != "" { 91 metakv = append(metakv, ":authority", r.Host) 92 } 93 for k, vv := range r.Header { 94 k = strings.ToLower(k) 95 if isReservedHeader(k) && !isWhitelistedHeader(k) { 96 continue 97 } 98 for _, v := range vv { 99 v, err := decodeMetadataHeader(k, v) 100 if err != nil { 101 return nil, streamErrorf(codes.Internal, "malformed binary metadata: %v", err) 102 } 103 metakv = append(metakv, k, v) 104 } 105 } 106 st.headerMD = metadata.Pairs(metakv...) 107 108 return st, nil 109 } 110 111 // serverHandlerTransport is an implementation of ServerTransport 112 // which replies to exactly one gRPC request (exactly one HTTP request), 113 // using the net/http.Handler interface. This http.Handler is guaranteed 114 // at this point to be speaking over HTTP/2, so it's able to speak valid 115 // gRPC. 116 type serverHandlerTransport struct { 117 rw http.ResponseWriter 118 req *http.Request 119 timeoutSet bool 120 timeout time.Duration 121 didCommonHeaders bool 122 123 headerMD metadata.MD 124 125 closeOnce sync.Once 126 closedCh chan struct{} // closed on Close 127 128 // writes is a channel of code to run serialized in the 129 // ServeHTTP (HandleStreams) goroutine. The channel is closed 130 // when WriteStatus is called. 131 writes chan func() 132 133 // block concurrent WriteStatus calls 134 // e.g. grpc/(*serverStream).SendMsg/RecvMsg 135 writeStatusMu sync.Mutex 136 137 // we just mirror the request content-type 138 contentType string 139 // we store both contentType and contentSubtype so we don't keep recreating them 140 // TODO make sure this is consistent across handler_server and http2_server 141 contentSubtype string 142 143 stats stats.Handler 144 } 145 146 func (ht *serverHandlerTransport) Close() error { 147 ht.closeOnce.Do(ht.closeCloseChanOnce) 148 return nil 149 } 150 151 func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) } 152 153 func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) } 154 155 // strAddr is a net.Addr backed by either a TCP "ip:port" string, or 156 // the empty string if unknown. 157 type strAddr string 158 159 func (a strAddr) Network() string { 160 if a != "" { 161 // Per the documentation on net/http.Request.RemoteAddr, if this is 162 // set, it's set to the IP:port of the peer (hence, TCP): 163 // https://golang.org/pkg/net/http/#Request 164 // 165 // If we want to support Unix sockets later, we can 166 // add our own grpc-specific convention within the 167 // grpc codebase to set RemoteAddr to a different 168 // format, or probably better: we can attach it to the 169 // context and use that from serverHandlerTransport.RemoteAddr. 170 return "tcp" 171 } 172 return "" 173 } 174 175 func (a strAddr) String() string { return string(a) } 176 177 // do runs fn in the ServeHTTP goroutine. 178 func (ht *serverHandlerTransport) do(fn func()) error { 179 // Avoid a panic writing to closed channel. Imperfect but maybe good enough. 180 select { 181 case <-ht.closedCh: 182 return ErrConnClosing 183 default: 184 select { 185 case ht.writes <- fn: 186 return nil 187 case <-ht.closedCh: 188 return ErrConnClosing 189 } 190 } 191 } 192 193 func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error { 194 ht.writeStatusMu.Lock() 195 defer ht.writeStatusMu.Unlock() 196 197 err := ht.do(func() { 198 ht.writeCommonHeaders(s) 199 200 // And flush, in case no header or body has been sent yet. 201 // This forces a separation of headers and trailers if this is the 202 // first call (for example, in end2end tests's TestNoService). 203 ht.rw.(http.Flusher).Flush() 204 205 h := ht.rw.Header() 206 h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code())) 207 if m := st.Message(); m != "" { 208 h.Set("Grpc-Message", encodeGrpcMessage(m)) 209 } 210 211 if p := st.Proto(); p != nil && len(p.Details) > 0 { 212 stBytes, err := proto.Marshal(p) 213 if err != nil { 214 // TODO: return error instead, when callers are able to handle it. 215 panic(err) 216 } 217 218 h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes)) 219 } 220 221 if md := s.Trailer(); len(md) > 0 { 222 for k, vv := range md { 223 // Clients don't tolerate reading restricted headers after some non restricted ones were sent. 224 if isReservedHeader(k) { 225 continue 226 } 227 for _, v := range vv { 228 // http2 ResponseWriter mechanism to send undeclared Trailers after 229 // the headers have possibly been written. 230 h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v)) 231 } 232 } 233 } 234 }) 235 236 if err == nil { // transport has not been closed 237 if ht.stats != nil { 238 ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{}) 239 } 240 ht.Close() 241 close(ht.writes) 242 } 243 return err 244 } 245 246 // writeCommonHeaders sets common headers on the first write 247 // call (Write, WriteHeader, or WriteStatus). 248 func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) { 249 if ht.didCommonHeaders { 250 return 251 } 252 ht.didCommonHeaders = true 253 254 h := ht.rw.Header() 255 h["Date"] = nil // suppress Date to make tests happy; TODO: restore 256 h.Set("Content-Type", ht.contentType) 257 258 // Predeclare trailers we'll set later in WriteStatus (after the body). 259 // This is a SHOULD in the HTTP RFC, and the way you add (known) 260 // Trailers per the net/http.ResponseWriter contract. 261 // See https://golang.org/pkg/net/http/#ResponseWriter 262 // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers 263 h.Add("Trailer", "Grpc-Status") 264 h.Add("Trailer", "Grpc-Message") 265 h.Add("Trailer", "Grpc-Status-Details-Bin") 266 267 if s.sendCompress != "" { 268 h.Set("Grpc-Encoding", s.sendCompress) 269 } 270 } 271 272 func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { 273 return ht.do(func() { 274 ht.writeCommonHeaders(s) 275 ht.rw.Write(hdr) 276 ht.rw.Write(data) 277 if !opts.Delay { 278 ht.rw.(http.Flusher).Flush() 279 } 280 }) 281 } 282 283 func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { 284 err := ht.do(func() { 285 ht.writeCommonHeaders(s) 286 h := ht.rw.Header() 287 for k, vv := range md { 288 // Clients don't tolerate reading restricted headers after some non restricted ones were sent. 289 if isReservedHeader(k) { 290 continue 291 } 292 for _, v := range vv { 293 v = encodeMetadataHeader(k, v) 294 h.Add(k, v) 295 } 296 } 297 ht.rw.WriteHeader(200) 298 ht.rw.(http.Flusher).Flush() 299 }) 300 301 if err == nil { 302 if ht.stats != nil { 303 ht.stats.HandleRPC(s.Context(), &stats.OutHeader{}) 304 } 305 } 306 return err 307 } 308 309 func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) { 310 // With this transport type there will be exactly 1 stream: this HTTP request. 311 312 ctx := contextFromRequest(ht.req) 313 var cancel context.CancelFunc 314 if ht.timeoutSet { 315 ctx, cancel = context.WithTimeout(ctx, ht.timeout) 316 } else { 317 ctx, cancel = context.WithCancel(ctx) 318 } 319 320 // requestOver is closed when either the request's context is done 321 // or the status has been written via WriteStatus. 322 requestOver := make(chan struct{}) 323 324 // clientGone receives a single value if peer is gone, either 325 // because the underlying connection is dead or because the 326 // peer sends an http2 RST_STREAM. 327 clientGone := ht.rw.(http.CloseNotifier).CloseNotify() 328 go func() { 329 select { 330 case <-requestOver: 331 return 332 case <-ht.closedCh: 333 case <-clientGone: 334 } 335 cancel() 336 }() 337 338 req := ht.req 339 340 s := &Stream{ 341 id: 0, // irrelevant 342 requestRead: func(int) {}, 343 cancel: cancel, 344 buf: newRecvBuffer(), 345 st: ht, 346 method: req.URL.Path, 347 recvCompress: req.Header.Get("grpc-encoding"), 348 contentSubtype: ht.contentSubtype, 349 } 350 pr := &peer.Peer{ 351 Addr: ht.RemoteAddr(), 352 } 353 if req.TLS != nil { 354 pr.AuthInfo = credentials.TLSInfo{State: *req.TLS} 355 } 356 ctx = metadata.NewIncomingContext(ctx, ht.headerMD) 357 s.ctx = peer.NewContext(ctx, pr) 358 if ht.stats != nil { 359 s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) 360 inHeader := &stats.InHeader{ 361 FullMethod: s.method, 362 RemoteAddr: ht.RemoteAddr(), 363 Compression: s.recvCompress, 364 } 365 ht.stats.HandleRPC(s.ctx, inHeader) 366 } 367 s.trReader = &transportReader{ 368 reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf}, 369 windowHandler: func(int) {}, 370 } 371 372 // readerDone is closed when the Body.Read-ing goroutine exits. 373 readerDone := make(chan struct{}) 374 go func() { 375 defer close(readerDone) 376 377 // TODO: minimize garbage, optimize recvBuffer code/ownership 378 const readSize = 8196 379 for buf := make([]byte, readSize); ; { 380 n, err := req.Body.Read(buf) 381 if n > 0 { 382 s.buf.put(recvMsg{data: buf[:n:n]}) 383 buf = buf[n:] 384 } 385 if err != nil { 386 s.buf.put(recvMsg{err: mapRecvMsgError(err)}) 387 return 388 } 389 if len(buf) == 0 { 390 buf = make([]byte, readSize) 391 } 392 } 393 }() 394 395 // startStream is provided by the *grpc.Server's serveStreams. 396 // It starts a goroutine serving s and exits immediately. 397 // The goroutine that is started is the one that then calls 398 // into ht, calling WriteHeader, Write, WriteStatus, Close, etc. 399 startStream(s) 400 401 ht.runStream() 402 close(requestOver) 403 404 // Wait for reading goroutine to finish. 405 req.Body.Close() 406 <-readerDone 407 } 408 409 func (ht *serverHandlerTransport) runStream() { 410 for { 411 select { 412 case fn, ok := <-ht.writes: 413 if !ok { 414 return 415 } 416 fn() 417 case <-ht.closedCh: 418 return 419 } 420 } 421 } 422 423 func (ht *serverHandlerTransport) IncrMsgSent() {} 424 425 func (ht *serverHandlerTransport) IncrMsgRecv() {} 426 427 func (ht *serverHandlerTransport) Drain() { 428 panic("Drain() is not implemented") 429 } 430 431 // mapRecvMsgError returns the non-nil err into the appropriate 432 // error value as expected by callers of *grpc.parser.recvMsg. 433 // In particular, in can only be: 434 // * io.EOF 435 // * io.ErrUnexpectedEOF 436 // * of type transport.ConnectionError 437 // * of type transport.StreamError 438 func mapRecvMsgError(err error) error { 439 if err == io.EOF || err == io.ErrUnexpectedEOF { 440 return err 441 } 442 if se, ok := err.(http2.StreamError); ok { 443 if code, ok := http2ErrConvTab[se.Code]; ok { 444 return StreamError{ 445 Code: code, 446 Desc: se.Error(), 447 } 448 } 449 } 450 return connectionErrorf(true, err, err.Error()) 451 } 452