Home | History | Annotate | Download | only in internal
      1 // Copyright 2011 Google Inc. All rights reserved.
      2 // Use of this source code is governed by the Apache 2.0
      3 // license that can be found in the LICENSE file.
      4 
      5 // +build !appengine
      6 // +build !go1.7
      7 
      8 package internal
      9 
     10 import (
     11 	"bytes"
     12 	"errors"
     13 	"fmt"
     14 	"io/ioutil"
     15 	"log"
     16 	"net"
     17 	"net/http"
     18 	"net/url"
     19 	"os"
     20 	"runtime"
     21 	"strconv"
     22 	"strings"
     23 	"sync"
     24 	"sync/atomic"
     25 	"time"
     26 
     27 	"github.com/golang/protobuf/proto"
     28 	netcontext "golang.org/x/net/context"
     29 
     30 	basepb "google.golang.org/appengine/internal/base"
     31 	logpb "google.golang.org/appengine/internal/log"
     32 	remotepb "google.golang.org/appengine/internal/remote_api"
     33 )
     34 
     35 const (
     36 	apiPath             = "/rpc_http"
     37 	defaultTicketSuffix = "/default.20150612t184001.0"
     38 )
     39 
     40 var (
     41 	// Incoming headers.
     42 	ticketHeader       = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
     43 	dapperHeader       = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
     44 	traceHeader        = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
     45 	curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
     46 	userIPHeader       = http.CanonicalHeaderKey("X-AppEngine-User-IP")
     47 	remoteAddrHeader   = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
     48 
     49 	// Outgoing headers.
     50 	apiEndpointHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
     51 	apiEndpointHeaderValue = []string{"app-engine-apis"}
     52 	apiMethodHeader        = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
     53 	apiMethodHeaderValue   = []string{"/VMRemoteAPI.CallRemoteAPI"}
     54 	apiDeadlineHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
     55 	apiContentType         = http.CanonicalHeaderKey("Content-Type")
     56 	apiContentTypeValue    = []string{"application/octet-stream"}
     57 	logFlushHeader         = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
     58 
     59 	apiHTTPClient = &http.Client{
     60 		Transport: &http.Transport{
     61 			Proxy: http.ProxyFromEnvironment,
     62 			Dial:  limitDial,
     63 		},
     64 	}
     65 
     66 	defaultTicketOnce sync.Once
     67 	defaultTicket     string
     68 )
     69 
     70 func apiURL() *url.URL {
     71 	host, port := "appengine.googleapis.internal", "10001"
     72 	if h := os.Getenv("API_HOST"); h != "" {
     73 		host = h
     74 	}
     75 	if p := os.Getenv("API_PORT"); p != "" {
     76 		port = p
     77 	}
     78 	return &url.URL{
     79 		Scheme: "http",
     80 		Host:   host + ":" + port,
     81 		Path:   apiPath,
     82 	}
     83 }
     84 
     85 func handleHTTP(w http.ResponseWriter, r *http.Request) {
     86 	c := &context{
     87 		req:       r,
     88 		outHeader: w.Header(),
     89 		apiURL:    apiURL(),
     90 	}
     91 	stopFlushing := make(chan int)
     92 
     93 	ctxs.Lock()
     94 	ctxs.m[r] = c
     95 	ctxs.Unlock()
     96 	defer func() {
     97 		ctxs.Lock()
     98 		delete(ctxs.m, r)
     99 		ctxs.Unlock()
    100 	}()
    101 
    102 	// Patch up RemoteAddr so it looks reasonable.
    103 	if addr := r.Header.Get(userIPHeader); addr != "" {
    104 		r.RemoteAddr = addr
    105 	} else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
    106 		r.RemoteAddr = addr
    107 	} else {
    108 		// Should not normally reach here, but pick a sensible default anyway.
    109 		r.RemoteAddr = "127.0.0.1"
    110 	}
    111 	// The address in the headers will most likely be of these forms:
    112 	//	123.123.123.123
    113 	//	2001:db8::1
    114 	// net/http.Request.RemoteAddr is specified to be in "IP:port" form.
    115 	if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
    116 		// Assume the remote address is only a host; add a default port.
    117 		r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
    118 	}
    119 
    120 	// Start goroutine responsible for flushing app logs.
    121 	// This is done after adding c to ctx.m (and stopped before removing it)
    122 	// because flushing logs requires making an API call.
    123 	go c.logFlusher(stopFlushing)
    124 
    125 	executeRequestSafely(c, r)
    126 	c.outHeader = nil // make sure header changes aren't respected any more
    127 
    128 	stopFlushing <- 1 // any logging beyond this point will be dropped
    129 
    130 	// Flush any pending logs asynchronously.
    131 	c.pendingLogs.Lock()
    132 	flushes := c.pendingLogs.flushes
    133 	if len(c.pendingLogs.lines) > 0 {
    134 		flushes++
    135 	}
    136 	c.pendingLogs.Unlock()
    137 	go c.flushLog(false)
    138 	w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
    139 
    140 	// Avoid nil Write call if c.Write is never called.
    141 	if c.outCode != 0 {
    142 		w.WriteHeader(c.outCode)
    143 	}
    144 	if c.outBody != nil {
    145 		w.Write(c.outBody)
    146 	}
    147 }
    148 
    149 func executeRequestSafely(c *context, r *http.Request) {
    150 	defer func() {
    151 		if x := recover(); x != nil {
    152 			logf(c, 4, "%s", renderPanic(x)) // 4 == critical
    153 			c.outCode = 500
    154 		}
    155 	}()
    156 
    157 	http.DefaultServeMux.ServeHTTP(c, r)
    158 }
    159 
    160 func renderPanic(x interface{}) string {
    161 	buf := make([]byte, 16<<10) // 16 KB should be plenty
    162 	buf = buf[:runtime.Stack(buf, false)]
    163 
    164 	// Remove the first few stack frames:
    165 	//   this func
    166 	//   the recover closure in the caller
    167 	// That will root the stack trace at the site of the panic.
    168 	const (
    169 		skipStart  = "internal.renderPanic"
    170 		skipFrames = 2
    171 	)
    172 	start := bytes.Index(buf, []byte(skipStart))
    173 	p := start
    174 	for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
    175 		p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
    176 		if p < 0 {
    177 			break
    178 		}
    179 	}
    180 	if p >= 0 {
    181 		// buf[start:p+1] is the block to remove.
    182 		// Copy buf[p+1:] over buf[start:] and shrink buf.
    183 		copy(buf[start:], buf[p+1:])
    184 		buf = buf[:len(buf)-(p+1-start)]
    185 	}
    186 
    187 	// Add panic heading.
    188 	head := fmt.Sprintf("panic: %v\n\n", x)
    189 	if len(head) > len(buf) {
    190 		// Extremely unlikely to happen.
    191 		return head
    192 	}
    193 	copy(buf[len(head):], buf)
    194 	copy(buf, head)
    195 
    196 	return string(buf)
    197 }
    198 
    199 var ctxs = struct {
    200 	sync.Mutex
    201 	m  map[*http.Request]*context
    202 	bg *context // background context, lazily initialized
    203 	// dec is used by tests to decorate the netcontext.Context returned
    204 	// for a given request. This allows tests to add overrides (such as
    205 	// WithAppIDOverride) to the context. The map is nil outside tests.
    206 	dec map[*http.Request]func(netcontext.Context) netcontext.Context
    207 }{
    208 	m: make(map[*http.Request]*context),
    209 }
    210 
    211 // context represents the context of an in-flight HTTP request.
    212 // It implements the appengine.Context and http.ResponseWriter interfaces.
    213 type context struct {
    214 	req *http.Request
    215 
    216 	outCode   int
    217 	outHeader http.Header
    218 	outBody   []byte
    219 
    220 	pendingLogs struct {
    221 		sync.Mutex
    222 		lines   []*logpb.UserAppLogLine
    223 		flushes int
    224 	}
    225 
    226 	apiURL *url.URL
    227 }
    228 
    229 var contextKey = "holds a *context"
    230 
    231 // fromContext returns the App Engine context or nil if ctx is not
    232 // derived from an App Engine context.
    233 func fromContext(ctx netcontext.Context) *context {
    234 	c, _ := ctx.Value(&contextKey).(*context)
    235 	return c
    236 }
    237 
    238 func withContext(parent netcontext.Context, c *context) netcontext.Context {
    239 	ctx := netcontext.WithValue(parent, &contextKey, c)
    240 	if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
    241 		ctx = withNamespace(ctx, ns)
    242 	}
    243 	return ctx
    244 }
    245 
    246 func toContext(c *context) netcontext.Context {
    247 	return withContext(netcontext.Background(), c)
    248 }
    249 
    250 func IncomingHeaders(ctx netcontext.Context) http.Header {
    251 	if c := fromContext(ctx); c != nil {
    252 		return c.req.Header
    253 	}
    254 	return nil
    255 }
    256 
    257 func ReqContext(req *http.Request) netcontext.Context {
    258 	return WithContext(netcontext.Background(), req)
    259 }
    260 
    261 func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
    262 	ctxs.Lock()
    263 	c := ctxs.m[req]
    264 	d := ctxs.dec[req]
    265 	ctxs.Unlock()
    266 
    267 	if d != nil {
    268 		parent = d(parent)
    269 	}
    270 
    271 	if c == nil {
    272 		// Someone passed in an http.Request that is not in-flight.
    273 		// We panic here rather than panicking at a later point
    274 		// so that stack traces will be more sensible.
    275 		log.Panic("appengine: NewContext passed an unknown http.Request")
    276 	}
    277 	return withContext(parent, c)
    278 }
    279 
    280 // DefaultTicket returns a ticket used for background context or dev_appserver.
    281 func DefaultTicket() string {
    282 	defaultTicketOnce.Do(func() {
    283 		if IsDevAppServer() {
    284 			defaultTicket = "testapp" + defaultTicketSuffix
    285 			return
    286 		}
    287 		appID := partitionlessAppID()
    288 		escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
    289 		majVersion := VersionID(nil)
    290 		if i := strings.Index(majVersion, "."); i > 0 {
    291 			majVersion = majVersion[:i]
    292 		}
    293 		defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
    294 	})
    295 	return defaultTicket
    296 }
    297 
    298 func BackgroundContext() netcontext.Context {
    299 	ctxs.Lock()
    300 	defer ctxs.Unlock()
    301 
    302 	if ctxs.bg != nil {
    303 		return toContext(ctxs.bg)
    304 	}
    305 
    306 	// Compute background security ticket.
    307 	ticket := DefaultTicket()
    308 
    309 	ctxs.bg = &context{
    310 		req: &http.Request{
    311 			Header: http.Header{
    312 				ticketHeader: []string{ticket},
    313 			},
    314 		},
    315 		apiURL: apiURL(),
    316 	}
    317 
    318 	// TODO(dsymonds): Wire up the shutdown handler to do a final flush.
    319 	go ctxs.bg.logFlusher(make(chan int))
    320 
    321 	return toContext(ctxs.bg)
    322 }
    323 
    324 // RegisterTestRequest registers the HTTP request req for testing, such that
    325 // any API calls are sent to the provided URL. It returns a closure to delete
    326 // the registration.
    327 // It should only be used by aetest package.
    328 func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
    329 	c := &context{
    330 		req:    req,
    331 		apiURL: apiURL,
    332 	}
    333 	ctxs.Lock()
    334 	defer ctxs.Unlock()
    335 	if _, ok := ctxs.m[req]; ok {
    336 		log.Panic("req already associated with context")
    337 	}
    338 	if _, ok := ctxs.dec[req]; ok {
    339 		log.Panic("req already associated with context")
    340 	}
    341 	if ctxs.dec == nil {
    342 		ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context)
    343 	}
    344 	ctxs.m[req] = c
    345 	ctxs.dec[req] = decorate
    346 
    347 	return req, func() {
    348 		ctxs.Lock()
    349 		delete(ctxs.m, req)
    350 		delete(ctxs.dec, req)
    351 		ctxs.Unlock()
    352 	}
    353 }
    354 
    355 var errTimeout = &CallError{
    356 	Detail:  "Deadline exceeded",
    357 	Code:    int32(remotepb.RpcError_CANCELLED),
    358 	Timeout: true,
    359 }
    360 
    361 func (c *context) Header() http.Header { return c.outHeader }
    362 
    363 // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
    364 // codes do not permit a response body (nor response entity headers such as
    365 // Content-Length, Content-Type, etc).
    366 func bodyAllowedForStatus(status int) bool {
    367 	switch {
    368 	case status >= 100 && status <= 199:
    369 		return false
    370 	case status == 204:
    371 		return false
    372 	case status == 304:
    373 		return false
    374 	}
    375 	return true
    376 }
    377 
    378 func (c *context) Write(b []byte) (int, error) {
    379 	if c.outCode == 0 {
    380 		c.WriteHeader(http.StatusOK)
    381 	}
    382 	if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
    383 		return 0, http.ErrBodyNotAllowed
    384 	}
    385 	c.outBody = append(c.outBody, b...)
    386 	return len(b), nil
    387 }
    388 
    389 func (c *context) WriteHeader(code int) {
    390 	if c.outCode != 0 {
    391 		logf(c, 3, "WriteHeader called multiple times on request.") // error level
    392 		return
    393 	}
    394 	c.outCode = code
    395 }
    396 
    397 func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
    398 	hreq := &http.Request{
    399 		Method: "POST",
    400 		URL:    c.apiURL,
    401 		Header: http.Header{
    402 			apiEndpointHeader: apiEndpointHeaderValue,
    403 			apiMethodHeader:   apiMethodHeaderValue,
    404 			apiContentType:    apiContentTypeValue,
    405 			apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
    406 		},
    407 		Body:          ioutil.NopCloser(bytes.NewReader(body)),
    408 		ContentLength: int64(len(body)),
    409 		Host:          c.apiURL.Host,
    410 	}
    411 	if info := c.req.Header.Get(dapperHeader); info != "" {
    412 		hreq.Header.Set(dapperHeader, info)
    413 	}
    414 	if info := c.req.Header.Get(traceHeader); info != "" {
    415 		hreq.Header.Set(traceHeader, info)
    416 	}
    417 
    418 	tr := apiHTTPClient.Transport.(*http.Transport)
    419 
    420 	var timedOut int32 // atomic; set to 1 if timed out
    421 	t := time.AfterFunc(timeout, func() {
    422 		atomic.StoreInt32(&timedOut, 1)
    423 		tr.CancelRequest(hreq)
    424 	})
    425 	defer t.Stop()
    426 	defer func() {
    427 		// Check if timeout was exceeded.
    428 		if atomic.LoadInt32(&timedOut) != 0 {
    429 			err = errTimeout
    430 		}
    431 	}()
    432 
    433 	hresp, err := apiHTTPClient.Do(hreq)
    434 	if err != nil {
    435 		return nil, &CallError{
    436 			Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
    437 			Code:   int32(remotepb.RpcError_UNKNOWN),
    438 		}
    439 	}
    440 	defer hresp.Body.Close()
    441 	hrespBody, err := ioutil.ReadAll(hresp.Body)
    442 	if hresp.StatusCode != 200 {
    443 		return nil, &CallError{
    444 			Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
    445 			Code:   int32(remotepb.RpcError_UNKNOWN),
    446 		}
    447 	}
    448 	if err != nil {
    449 		return nil, &CallError{
    450 			Detail: fmt.Sprintf("service bridge response bad: %v", err),
    451 			Code:   int32(remotepb.RpcError_UNKNOWN),
    452 		}
    453 	}
    454 	return hrespBody, nil
    455 }
    456 
    457 func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
    458 	if ns := NamespaceFromContext(ctx); ns != "" {
    459 		if fn, ok := NamespaceMods[service]; ok {
    460 			fn(in, ns)
    461 		}
    462 	}
    463 
    464 	if f, ctx, ok := callOverrideFromContext(ctx); ok {
    465 		return f(ctx, service, method, in, out)
    466 	}
    467 
    468 	// Handle already-done contexts quickly.
    469 	select {
    470 	case <-ctx.Done():
    471 		return ctx.Err()
    472 	default:
    473 	}
    474 
    475 	c := fromContext(ctx)
    476 	if c == nil {
    477 		// Give a good error message rather than a panic lower down.
    478 		return errNotAppEngineContext
    479 	}
    480 
    481 	// Apply transaction modifications if we're in a transaction.
    482 	if t := transactionFromContext(ctx); t != nil {
    483 		if t.finished {
    484 			return errors.New("transaction context has expired")
    485 		}
    486 		applyTransaction(in, &t.transaction)
    487 	}
    488 
    489 	// Default RPC timeout is 60s.
    490 	timeout := 60 * time.Second
    491 	if deadline, ok := ctx.Deadline(); ok {
    492 		timeout = deadline.Sub(time.Now())
    493 	}
    494 
    495 	data, err := proto.Marshal(in)
    496 	if err != nil {
    497 		return err
    498 	}
    499 
    500 	ticket := c.req.Header.Get(ticketHeader)
    501 	// Use a test ticket under test environment.
    502 	if ticket == "" {
    503 		if appid := ctx.Value(&appIDOverrideKey); appid != nil {
    504 			ticket = appid.(string) + defaultTicketSuffix
    505 		}
    506 	}
    507 	// Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
    508 	if ticket == "" {
    509 		ticket = DefaultTicket()
    510 	}
    511 	req := &remotepb.Request{
    512 		ServiceName: &service,
    513 		Method:      &method,
    514 		Request:     data,
    515 		RequestId:   &ticket,
    516 	}
    517 	hreqBody, err := proto.Marshal(req)
    518 	if err != nil {
    519 		return err
    520 	}
    521 
    522 	hrespBody, err := c.post(hreqBody, timeout)
    523 	if err != nil {
    524 		return err
    525 	}
    526 
    527 	res := &remotepb.Response{}
    528 	if err := proto.Unmarshal(hrespBody, res); err != nil {
    529 		return err
    530 	}
    531 	if res.RpcError != nil {
    532 		ce := &CallError{
    533 			Detail: res.RpcError.GetDetail(),
    534 			Code:   *res.RpcError.Code,
    535 		}
    536 		switch remotepb.RpcError_ErrorCode(ce.Code) {
    537 		case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
    538 			ce.Timeout = true
    539 		}
    540 		return ce
    541 	}
    542 	if res.ApplicationError != nil {
    543 		return &APIError{
    544 			Service: *req.ServiceName,
    545 			Detail:  res.ApplicationError.GetDetail(),
    546 			Code:    *res.ApplicationError.Code,
    547 		}
    548 	}
    549 	if res.Exception != nil || res.JavaException != nil {
    550 		// This shouldn't happen, but let's be defensive.
    551 		return &CallError{
    552 			Detail: "service bridge returned exception",
    553 			Code:   int32(remotepb.RpcError_UNKNOWN),
    554 		}
    555 	}
    556 	return proto.Unmarshal(res.Response, out)
    557 }
    558 
    559 func (c *context) Request() *http.Request {
    560 	return c.req
    561 }
    562 
    563 func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
    564 	// Truncate long log lines.
    565 	// TODO(dsymonds): Check if this is still necessary.
    566 	const lim = 8 << 10
    567 	if len(*ll.Message) > lim {
    568 		suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
    569 		ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
    570 	}
    571 
    572 	c.pendingLogs.Lock()
    573 	c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
    574 	c.pendingLogs.Unlock()
    575 }
    576 
    577 var logLevelName = map[int64]string{
    578 	0: "DEBUG",
    579 	1: "INFO",
    580 	2: "WARNING",
    581 	3: "ERROR",
    582 	4: "CRITICAL",
    583 }
    584 
    585 func logf(c *context, level int64, format string, args ...interface{}) {
    586 	if c == nil {
    587 		panic("not an App Engine context")
    588 	}
    589 	s := fmt.Sprintf(format, args...)
    590 	s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
    591 	c.addLogLine(&logpb.UserAppLogLine{
    592 		TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
    593 		Level:         &level,
    594 		Message:       &s,
    595 	})
    596 	log.Print(logLevelName[level] + ": " + s)
    597 }
    598 
    599 // flushLog attempts to flush any pending logs to the appserver.
    600 // It should not be called concurrently.
    601 func (c *context) flushLog(force bool) (flushed bool) {
    602 	c.pendingLogs.Lock()
    603 	// Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
    604 	n, rem := 0, 30<<20
    605 	for ; n < len(c.pendingLogs.lines); n++ {
    606 		ll := c.pendingLogs.lines[n]
    607 		// Each log line will require about 3 bytes of overhead.
    608 		nb := proto.Size(ll) + 3
    609 		if nb > rem {
    610 			break
    611 		}
    612 		rem -= nb
    613 	}
    614 	lines := c.pendingLogs.lines[:n]
    615 	c.pendingLogs.lines = c.pendingLogs.lines[n:]
    616 	c.pendingLogs.Unlock()
    617 
    618 	if len(lines) == 0 && !force {
    619 		// Nothing to flush.
    620 		return false
    621 	}
    622 
    623 	rescueLogs := false
    624 	defer func() {
    625 		if rescueLogs {
    626 			c.pendingLogs.Lock()
    627 			c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
    628 			c.pendingLogs.Unlock()
    629 		}
    630 	}()
    631 
    632 	buf, err := proto.Marshal(&logpb.UserAppLogGroup{
    633 		LogLine: lines,
    634 	})
    635 	if err != nil {
    636 		log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
    637 		rescueLogs = true
    638 		return false
    639 	}
    640 
    641 	req := &logpb.FlushRequest{
    642 		Logs: buf,
    643 	}
    644 	res := &basepb.VoidProto{}
    645 	c.pendingLogs.Lock()
    646 	c.pendingLogs.flushes++
    647 	c.pendingLogs.Unlock()
    648 	if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
    649 		log.Printf("internal.flushLog: Flush RPC: %v", err)
    650 		rescueLogs = true
    651 		return false
    652 	}
    653 	return true
    654 }
    655 
    656 const (
    657 	// Log flushing parameters.
    658 	flushInterval      = 1 * time.Second
    659 	forceFlushInterval = 60 * time.Second
    660 )
    661 
    662 func (c *context) logFlusher(stop <-chan int) {
    663 	lastFlush := time.Now()
    664 	tick := time.NewTicker(flushInterval)
    665 	for {
    666 		select {
    667 		case <-stop:
    668 			// Request finished.
    669 			tick.Stop()
    670 			return
    671 		case <-tick.C:
    672 			force := time.Now().Sub(lastFlush) > forceFlushInterval
    673 			if c.flushLog(force) {
    674 				lastFlush = time.Now()
    675 			}
    676 		}
    677 	}
    678 }
    679 
    680 func ContextForTesting(req *http.Request) netcontext.Context {
    681 	return toContext(&context{req: req})
    682 }
    683