1 // Copyright 2011 Google Inc. All rights reserved. 2 // Use of this source code is governed by the Apache 2.0 3 // license that can be found in the LICENSE file. 4 5 // +build !appengine 6 // +build !go1.7 7 8 package internal 9 10 import ( 11 "bytes" 12 "errors" 13 "fmt" 14 "io/ioutil" 15 "log" 16 "net" 17 "net/http" 18 "net/url" 19 "os" 20 "runtime" 21 "strconv" 22 "strings" 23 "sync" 24 "sync/atomic" 25 "time" 26 27 "github.com/golang/protobuf/proto" 28 netcontext "golang.org/x/net/context" 29 30 basepb "google.golang.org/appengine/internal/base" 31 logpb "google.golang.org/appengine/internal/log" 32 remotepb "google.golang.org/appengine/internal/remote_api" 33 ) 34 35 const ( 36 apiPath = "/rpc_http" 37 defaultTicketSuffix = "/default.20150612t184001.0" 38 ) 39 40 var ( 41 // Incoming headers. 42 ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket") 43 dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo") 44 traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context") 45 curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") 46 userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP") 47 remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr") 48 49 // Outgoing headers. 50 apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint") 51 apiEndpointHeaderValue = []string{"app-engine-apis"} 52 apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method") 53 apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"} 54 apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline") 55 apiContentType = http.CanonicalHeaderKey("Content-Type") 56 apiContentTypeValue = []string{"application/octet-stream"} 57 logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count") 58 59 apiHTTPClient = &http.Client{ 60 Transport: &http.Transport{ 61 Proxy: http.ProxyFromEnvironment, 62 Dial: limitDial, 63 }, 64 } 65 66 defaultTicketOnce sync.Once 67 defaultTicket string 68 ) 69 70 func apiURL() *url.URL { 71 host, port := "appengine.googleapis.internal", "10001" 72 if h := os.Getenv("API_HOST"); h != "" { 73 host = h 74 } 75 if p := os.Getenv("API_PORT"); p != "" { 76 port = p 77 } 78 return &url.URL{ 79 Scheme: "http", 80 Host: host + ":" + port, 81 Path: apiPath, 82 } 83 } 84 85 func handleHTTP(w http.ResponseWriter, r *http.Request) { 86 c := &context{ 87 req: r, 88 outHeader: w.Header(), 89 apiURL: apiURL(), 90 } 91 stopFlushing := make(chan int) 92 93 ctxs.Lock() 94 ctxs.m[r] = c 95 ctxs.Unlock() 96 defer func() { 97 ctxs.Lock() 98 delete(ctxs.m, r) 99 ctxs.Unlock() 100 }() 101 102 // Patch up RemoteAddr so it looks reasonable. 103 if addr := r.Header.Get(userIPHeader); addr != "" { 104 r.RemoteAddr = addr 105 } else if addr = r.Header.Get(remoteAddrHeader); addr != "" { 106 r.RemoteAddr = addr 107 } else { 108 // Should not normally reach here, but pick a sensible default anyway. 109 r.RemoteAddr = "127.0.0.1" 110 } 111 // The address in the headers will most likely be of these forms: 112 // 123.123.123.123 113 // 2001:db8::1 114 // net/http.Request.RemoteAddr is specified to be in "IP:port" form. 115 if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil { 116 // Assume the remote address is only a host; add a default port. 117 r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") 118 } 119 120 // Start goroutine responsible for flushing app logs. 121 // This is done after adding c to ctx.m (and stopped before removing it) 122 // because flushing logs requires making an API call. 123 go c.logFlusher(stopFlushing) 124 125 executeRequestSafely(c, r) 126 c.outHeader = nil // make sure header changes aren't respected any more 127 128 stopFlushing <- 1 // any logging beyond this point will be dropped 129 130 // Flush any pending logs asynchronously. 131 c.pendingLogs.Lock() 132 flushes := c.pendingLogs.flushes 133 if len(c.pendingLogs.lines) > 0 { 134 flushes++ 135 } 136 c.pendingLogs.Unlock() 137 go c.flushLog(false) 138 w.Header().Set(logFlushHeader, strconv.Itoa(flushes)) 139 140 // Avoid nil Write call if c.Write is never called. 141 if c.outCode != 0 { 142 w.WriteHeader(c.outCode) 143 } 144 if c.outBody != nil { 145 w.Write(c.outBody) 146 } 147 } 148 149 func executeRequestSafely(c *context, r *http.Request) { 150 defer func() { 151 if x := recover(); x != nil { 152 logf(c, 4, "%s", renderPanic(x)) // 4 == critical 153 c.outCode = 500 154 } 155 }() 156 157 http.DefaultServeMux.ServeHTTP(c, r) 158 } 159 160 func renderPanic(x interface{}) string { 161 buf := make([]byte, 16<<10) // 16 KB should be plenty 162 buf = buf[:runtime.Stack(buf, false)] 163 164 // Remove the first few stack frames: 165 // this func 166 // the recover closure in the caller 167 // That will root the stack trace at the site of the panic. 168 const ( 169 skipStart = "internal.renderPanic" 170 skipFrames = 2 171 ) 172 start := bytes.Index(buf, []byte(skipStart)) 173 p := start 174 for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ { 175 p = bytes.IndexByte(buf[p+1:], '\n') + p + 1 176 if p < 0 { 177 break 178 } 179 } 180 if p >= 0 { 181 // buf[start:p+1] is the block to remove. 182 // Copy buf[p+1:] over buf[start:] and shrink buf. 183 copy(buf[start:], buf[p+1:]) 184 buf = buf[:len(buf)-(p+1-start)] 185 } 186 187 // Add panic heading. 188 head := fmt.Sprintf("panic: %v\n\n", x) 189 if len(head) > len(buf) { 190 // Extremely unlikely to happen. 191 return head 192 } 193 copy(buf[len(head):], buf) 194 copy(buf, head) 195 196 return string(buf) 197 } 198 199 var ctxs = struct { 200 sync.Mutex 201 m map[*http.Request]*context 202 bg *context // background context, lazily initialized 203 // dec is used by tests to decorate the netcontext.Context returned 204 // for a given request. This allows tests to add overrides (such as 205 // WithAppIDOverride) to the context. The map is nil outside tests. 206 dec map[*http.Request]func(netcontext.Context) netcontext.Context 207 }{ 208 m: make(map[*http.Request]*context), 209 } 210 211 // context represents the context of an in-flight HTTP request. 212 // It implements the appengine.Context and http.ResponseWriter interfaces. 213 type context struct { 214 req *http.Request 215 216 outCode int 217 outHeader http.Header 218 outBody []byte 219 220 pendingLogs struct { 221 sync.Mutex 222 lines []*logpb.UserAppLogLine 223 flushes int 224 } 225 226 apiURL *url.URL 227 } 228 229 var contextKey = "holds a *context" 230 231 // fromContext returns the App Engine context or nil if ctx is not 232 // derived from an App Engine context. 233 func fromContext(ctx netcontext.Context) *context { 234 c, _ := ctx.Value(&contextKey).(*context) 235 return c 236 } 237 238 func withContext(parent netcontext.Context, c *context) netcontext.Context { 239 ctx := netcontext.WithValue(parent, &contextKey, c) 240 if ns := c.req.Header.Get(curNamespaceHeader); ns != "" { 241 ctx = withNamespace(ctx, ns) 242 } 243 return ctx 244 } 245 246 func toContext(c *context) netcontext.Context { 247 return withContext(netcontext.Background(), c) 248 } 249 250 func IncomingHeaders(ctx netcontext.Context) http.Header { 251 if c := fromContext(ctx); c != nil { 252 return c.req.Header 253 } 254 return nil 255 } 256 257 func ReqContext(req *http.Request) netcontext.Context { 258 return WithContext(netcontext.Background(), req) 259 } 260 261 func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { 262 ctxs.Lock() 263 c := ctxs.m[req] 264 d := ctxs.dec[req] 265 ctxs.Unlock() 266 267 if d != nil { 268 parent = d(parent) 269 } 270 271 if c == nil { 272 // Someone passed in an http.Request that is not in-flight. 273 // We panic here rather than panicking at a later point 274 // so that stack traces will be more sensible. 275 log.Panic("appengine: NewContext passed an unknown http.Request") 276 } 277 return withContext(parent, c) 278 } 279 280 // DefaultTicket returns a ticket used for background context or dev_appserver. 281 func DefaultTicket() string { 282 defaultTicketOnce.Do(func() { 283 if IsDevAppServer() { 284 defaultTicket = "testapp" + defaultTicketSuffix 285 return 286 } 287 appID := partitionlessAppID() 288 escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1) 289 majVersion := VersionID(nil) 290 if i := strings.Index(majVersion, "."); i > 0 { 291 majVersion = majVersion[:i] 292 } 293 defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID()) 294 }) 295 return defaultTicket 296 } 297 298 func BackgroundContext() netcontext.Context { 299 ctxs.Lock() 300 defer ctxs.Unlock() 301 302 if ctxs.bg != nil { 303 return toContext(ctxs.bg) 304 } 305 306 // Compute background security ticket. 307 ticket := DefaultTicket() 308 309 ctxs.bg = &context{ 310 req: &http.Request{ 311 Header: http.Header{ 312 ticketHeader: []string{ticket}, 313 }, 314 }, 315 apiURL: apiURL(), 316 } 317 318 // TODO(dsymonds): Wire up the shutdown handler to do a final flush. 319 go ctxs.bg.logFlusher(make(chan int)) 320 321 return toContext(ctxs.bg) 322 } 323 324 // RegisterTestRequest registers the HTTP request req for testing, such that 325 // any API calls are sent to the provided URL. It returns a closure to delete 326 // the registration. 327 // It should only be used by aetest package. 328 func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) { 329 c := &context{ 330 req: req, 331 apiURL: apiURL, 332 } 333 ctxs.Lock() 334 defer ctxs.Unlock() 335 if _, ok := ctxs.m[req]; ok { 336 log.Panic("req already associated with context") 337 } 338 if _, ok := ctxs.dec[req]; ok { 339 log.Panic("req already associated with context") 340 } 341 if ctxs.dec == nil { 342 ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context) 343 } 344 ctxs.m[req] = c 345 ctxs.dec[req] = decorate 346 347 return req, func() { 348 ctxs.Lock() 349 delete(ctxs.m, req) 350 delete(ctxs.dec, req) 351 ctxs.Unlock() 352 } 353 } 354 355 var errTimeout = &CallError{ 356 Detail: "Deadline exceeded", 357 Code: int32(remotepb.RpcError_CANCELLED), 358 Timeout: true, 359 } 360 361 func (c *context) Header() http.Header { return c.outHeader } 362 363 // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status 364 // codes do not permit a response body (nor response entity headers such as 365 // Content-Length, Content-Type, etc). 366 func bodyAllowedForStatus(status int) bool { 367 switch { 368 case status >= 100 && status <= 199: 369 return false 370 case status == 204: 371 return false 372 case status == 304: 373 return false 374 } 375 return true 376 } 377 378 func (c *context) Write(b []byte) (int, error) { 379 if c.outCode == 0 { 380 c.WriteHeader(http.StatusOK) 381 } 382 if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { 383 return 0, http.ErrBodyNotAllowed 384 } 385 c.outBody = append(c.outBody, b...) 386 return len(b), nil 387 } 388 389 func (c *context) WriteHeader(code int) { 390 if c.outCode != 0 { 391 logf(c, 3, "WriteHeader called multiple times on request.") // error level 392 return 393 } 394 c.outCode = code 395 } 396 397 func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) { 398 hreq := &http.Request{ 399 Method: "POST", 400 URL: c.apiURL, 401 Header: http.Header{ 402 apiEndpointHeader: apiEndpointHeaderValue, 403 apiMethodHeader: apiMethodHeaderValue, 404 apiContentType: apiContentTypeValue, 405 apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)}, 406 }, 407 Body: ioutil.NopCloser(bytes.NewReader(body)), 408 ContentLength: int64(len(body)), 409 Host: c.apiURL.Host, 410 } 411 if info := c.req.Header.Get(dapperHeader); info != "" { 412 hreq.Header.Set(dapperHeader, info) 413 } 414 if info := c.req.Header.Get(traceHeader); info != "" { 415 hreq.Header.Set(traceHeader, info) 416 } 417 418 tr := apiHTTPClient.Transport.(*http.Transport) 419 420 var timedOut int32 // atomic; set to 1 if timed out 421 t := time.AfterFunc(timeout, func() { 422 atomic.StoreInt32(&timedOut, 1) 423 tr.CancelRequest(hreq) 424 }) 425 defer t.Stop() 426 defer func() { 427 // Check if timeout was exceeded. 428 if atomic.LoadInt32(&timedOut) != 0 { 429 err = errTimeout 430 } 431 }() 432 433 hresp, err := apiHTTPClient.Do(hreq) 434 if err != nil { 435 return nil, &CallError{ 436 Detail: fmt.Sprintf("service bridge HTTP failed: %v", err), 437 Code: int32(remotepb.RpcError_UNKNOWN), 438 } 439 } 440 defer hresp.Body.Close() 441 hrespBody, err := ioutil.ReadAll(hresp.Body) 442 if hresp.StatusCode != 200 { 443 return nil, &CallError{ 444 Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody), 445 Code: int32(remotepb.RpcError_UNKNOWN), 446 } 447 } 448 if err != nil { 449 return nil, &CallError{ 450 Detail: fmt.Sprintf("service bridge response bad: %v", err), 451 Code: int32(remotepb.RpcError_UNKNOWN), 452 } 453 } 454 return hrespBody, nil 455 } 456 457 func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error { 458 if ns := NamespaceFromContext(ctx); ns != "" { 459 if fn, ok := NamespaceMods[service]; ok { 460 fn(in, ns) 461 } 462 } 463 464 if f, ctx, ok := callOverrideFromContext(ctx); ok { 465 return f(ctx, service, method, in, out) 466 } 467 468 // Handle already-done contexts quickly. 469 select { 470 case <-ctx.Done(): 471 return ctx.Err() 472 default: 473 } 474 475 c := fromContext(ctx) 476 if c == nil { 477 // Give a good error message rather than a panic lower down. 478 return errNotAppEngineContext 479 } 480 481 // Apply transaction modifications if we're in a transaction. 482 if t := transactionFromContext(ctx); t != nil { 483 if t.finished { 484 return errors.New("transaction context has expired") 485 } 486 applyTransaction(in, &t.transaction) 487 } 488 489 // Default RPC timeout is 60s. 490 timeout := 60 * time.Second 491 if deadline, ok := ctx.Deadline(); ok { 492 timeout = deadline.Sub(time.Now()) 493 } 494 495 data, err := proto.Marshal(in) 496 if err != nil { 497 return err 498 } 499 500 ticket := c.req.Header.Get(ticketHeader) 501 // Use a test ticket under test environment. 502 if ticket == "" { 503 if appid := ctx.Value(&appIDOverrideKey); appid != nil { 504 ticket = appid.(string) + defaultTicketSuffix 505 } 506 } 507 // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver. 508 if ticket == "" { 509 ticket = DefaultTicket() 510 } 511 req := &remotepb.Request{ 512 ServiceName: &service, 513 Method: &method, 514 Request: data, 515 RequestId: &ticket, 516 } 517 hreqBody, err := proto.Marshal(req) 518 if err != nil { 519 return err 520 } 521 522 hrespBody, err := c.post(hreqBody, timeout) 523 if err != nil { 524 return err 525 } 526 527 res := &remotepb.Response{} 528 if err := proto.Unmarshal(hrespBody, res); err != nil { 529 return err 530 } 531 if res.RpcError != nil { 532 ce := &CallError{ 533 Detail: res.RpcError.GetDetail(), 534 Code: *res.RpcError.Code, 535 } 536 switch remotepb.RpcError_ErrorCode(ce.Code) { 537 case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED: 538 ce.Timeout = true 539 } 540 return ce 541 } 542 if res.ApplicationError != nil { 543 return &APIError{ 544 Service: *req.ServiceName, 545 Detail: res.ApplicationError.GetDetail(), 546 Code: *res.ApplicationError.Code, 547 } 548 } 549 if res.Exception != nil || res.JavaException != nil { 550 // This shouldn't happen, but let's be defensive. 551 return &CallError{ 552 Detail: "service bridge returned exception", 553 Code: int32(remotepb.RpcError_UNKNOWN), 554 } 555 } 556 return proto.Unmarshal(res.Response, out) 557 } 558 559 func (c *context) Request() *http.Request { 560 return c.req 561 } 562 563 func (c *context) addLogLine(ll *logpb.UserAppLogLine) { 564 // Truncate long log lines. 565 // TODO(dsymonds): Check if this is still necessary. 566 const lim = 8 << 10 567 if len(*ll.Message) > lim { 568 suffix := fmt.Sprintf("...(length %d)", len(*ll.Message)) 569 ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix) 570 } 571 572 c.pendingLogs.Lock() 573 c.pendingLogs.lines = append(c.pendingLogs.lines, ll) 574 c.pendingLogs.Unlock() 575 } 576 577 var logLevelName = map[int64]string{ 578 0: "DEBUG", 579 1: "INFO", 580 2: "WARNING", 581 3: "ERROR", 582 4: "CRITICAL", 583 } 584 585 func logf(c *context, level int64, format string, args ...interface{}) { 586 if c == nil { 587 panic("not an App Engine context") 588 } 589 s := fmt.Sprintf(format, args...) 590 s = strings.TrimRight(s, "\n") // Remove any trailing newline characters. 591 c.addLogLine(&logpb.UserAppLogLine{ 592 TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3), 593 Level: &level, 594 Message: &s, 595 }) 596 log.Print(logLevelName[level] + ": " + s) 597 } 598 599 // flushLog attempts to flush any pending logs to the appserver. 600 // It should not be called concurrently. 601 func (c *context) flushLog(force bool) (flushed bool) { 602 c.pendingLogs.Lock() 603 // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious. 604 n, rem := 0, 30<<20 605 for ; n < len(c.pendingLogs.lines); n++ { 606 ll := c.pendingLogs.lines[n] 607 // Each log line will require about 3 bytes of overhead. 608 nb := proto.Size(ll) + 3 609 if nb > rem { 610 break 611 } 612 rem -= nb 613 } 614 lines := c.pendingLogs.lines[:n] 615 c.pendingLogs.lines = c.pendingLogs.lines[n:] 616 c.pendingLogs.Unlock() 617 618 if len(lines) == 0 && !force { 619 // Nothing to flush. 620 return false 621 } 622 623 rescueLogs := false 624 defer func() { 625 if rescueLogs { 626 c.pendingLogs.Lock() 627 c.pendingLogs.lines = append(lines, c.pendingLogs.lines...) 628 c.pendingLogs.Unlock() 629 } 630 }() 631 632 buf, err := proto.Marshal(&logpb.UserAppLogGroup{ 633 LogLine: lines, 634 }) 635 if err != nil { 636 log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err) 637 rescueLogs = true 638 return false 639 } 640 641 req := &logpb.FlushRequest{ 642 Logs: buf, 643 } 644 res := &basepb.VoidProto{} 645 c.pendingLogs.Lock() 646 c.pendingLogs.flushes++ 647 c.pendingLogs.Unlock() 648 if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil { 649 log.Printf("internal.flushLog: Flush RPC: %v", err) 650 rescueLogs = true 651 return false 652 } 653 return true 654 } 655 656 const ( 657 // Log flushing parameters. 658 flushInterval = 1 * time.Second 659 forceFlushInterval = 60 * time.Second 660 ) 661 662 func (c *context) logFlusher(stop <-chan int) { 663 lastFlush := time.Now() 664 tick := time.NewTicker(flushInterval) 665 for { 666 select { 667 case <-stop: 668 // Request finished. 669 tick.Stop() 670 return 671 case <-tick.C: 672 force := time.Now().Sub(lastFlush) > forceFlushInterval 673 if c.flushLog(force) { 674 lastFlush = time.Now() 675 } 676 } 677 } 678 } 679 680 func ContextForTesting(req *http.Request) netcontext.Context { 681 return toContext(&context{req: req}) 682 } 683