Home | History | Annotate | Download | only in runtime
      1 // Copyright 2017 The Go Authors.  All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 package runtime
      6 
      7 import (
      8 	"runtime/internal/atomic"
      9 	"unsafe"
     10 )
     11 
     12 // A profBuf is a lock-free buffer for profiling events,
     13 // safe for concurrent use by one reader and one writer.
     14 // The writer may be a signal handler running without a user g.
     15 // The reader is assumed to be a user g.
     16 //
     17 // Each logged event corresponds to a fixed size header, a list of
     18 // uintptrs (typically a stack), and exactly one unsafe.Pointer tag.
     19 // The header and uintptrs are stored in the circular buffer data and the
     20 // tag is stored in a circular buffer tags, running in parallel.
     21 // In the circular buffer data, each event takes 2+hdrsize+len(stk)
     22 // words: the value 2+hdrsize+len(stk), then the time of the event, then
     23 // hdrsize words giving the fixed-size header, and then len(stk) words
     24 // for the stack.
     25 //
     26 // The current effective offsets into the tags and data circular buffers
     27 // for reading and writing are stored in the high 30 and low 32 bits of r and w.
     28 // The bottom bits of the high 32 are additional flag bits in w, unused in r.
     29 // "Effective" offsets means the total number of reads or writes, mod 2^length.
     30 // The offset in the buffer is the effective offset mod the length of the buffer.
     31 // To make wraparound mod 2^length match wraparound mod length of the buffer,
     32 // the length of the buffer must be a power of two.
     33 //
     34 // If the reader catches up to the writer, a flag passed to read controls
     35 // whether the read blocks until more data is available. A read returns a
     36 // pointer to the buffer data itself; the caller is assumed to be done with
     37 // that data at the next read. The read offset rNext tracks the next offset to
     38 // be returned by read. By definition, r  rNext  w (before wraparound),
     39 // and rNext is only used by the reader, so it can be accessed without atomics.
     40 //
     41 // If the writer gets ahead of the reader, so that the buffer fills,
     42 // future writes are discarded and replaced in the output stream by an
     43 // overflow entry, which has size 2+hdrsize+1, time set to the time of
     44 // the first discarded write, a header of all zeroed words, and a "stack"
     45 // containing one word, the number of discarded writes.
     46 //
     47 // Between the time the buffer fills and the buffer becomes empty enough
     48 // to hold more data, the overflow entry is stored as a pending overflow
     49 // entry in the fields overflow and overflowTime. The pending overflow
     50 // entry can be turned into a real record by either the writer or the
     51 // reader. If the writer is called to write a new record and finds that
     52 // the output buffer has room for both the pending overflow entry and the
     53 // new record, the writer emits the pending overflow entry and the new
     54 // record into the buffer. If the reader is called to read data and finds
     55 // that the output buffer is empty but that there is a pending overflow
     56 // entry, the reader will return a synthesized record for the pending
     57 // overflow entry.
     58 //
     59 // Only the writer can create or add to a pending overflow entry, but
     60 // either the reader or the writer can clear the pending overflow entry.
     61 // A pending overflow entry is indicated by the low 32 bits of 'overflow'
     62 // holding the number of discarded writes, and overflowTime holding the
     63 // time of the first discarded write. The high 32 bits of 'overflow'
     64 // increment each time the low 32 bits transition from zero to non-zero
     65 // or vice versa. This sequence number avoids ABA problems in the use of
     66 // compare-and-swap to coordinate between reader and writer.
     67 // The overflowTime is only written when the low 32 bits of overflow are
     68 // zero, that is, only when there is no pending overflow entry, in
     69 // preparation for creating a new one. The reader can therefore fetch and
     70 // clear the entry atomically using
     71 //
     72 //	for {
     73 //		overflow = load(&b.overflow)
     74 //		if uint32(overflow) == 0 {
     75 //			// no pending entry
     76 //			break
     77 //		}
     78 //		time = load(&b.overflowTime)
     79 //		if cas(&b.overflow, overflow, ((overflow>>32)+1)<<32) {
     80 //			// pending entry cleared
     81 //			break
     82 //		}
     83 //	}
     84 //	if uint32(overflow) > 0 {
     85 //		emit entry for uint32(overflow), time
     86 //	}
     87 //
     88 type profBuf struct {
     89 	// accessed atomically
     90 	r, w         profAtomic
     91 	overflow     uint64
     92 	overflowTime uint64
     93 	eof          uint32
     94 
     95 	// immutable (excluding slice content)
     96 	hdrsize uintptr
     97 	data    []uint64
     98 	tags    []unsafe.Pointer
     99 
    100 	// owned by reader
    101 	rNext       profIndex
    102 	overflowBuf []uint64 // for use by reader to return overflow record
    103 	wait        note
    104 }
    105 
    106 // A profAtomic is the atomically-accessed word holding a profIndex.
    107 type profAtomic uint64
    108 
    109 // A profIndex is the packet tag and data counts and flags bits, described above.
    110 type profIndex uint64
    111 
    112 const (
    113 	profReaderSleeping profIndex = 1 << 32 // reader is sleeping and must be woken up
    114 	profWriteExtra     profIndex = 1 << 33 // overflow or eof waiting
    115 )
    116 
    117 func (x *profAtomic) load() profIndex {
    118 	return profIndex(atomic.Load64((*uint64)(x)))
    119 }
    120 
    121 func (x *profAtomic) store(new profIndex) {
    122 	atomic.Store64((*uint64)(x), uint64(new))
    123 }
    124 
    125 func (x *profAtomic) cas(old, new profIndex) bool {
    126 	return atomic.Cas64((*uint64)(x), uint64(old), uint64(new))
    127 }
    128 
    129 func (x profIndex) dataCount() uint32 {
    130 	return uint32(x)
    131 }
    132 
    133 func (x profIndex) tagCount() uint32 {
    134 	return uint32(x >> 34)
    135 }
    136 
    137 // countSub subtracts two counts obtained from profIndex.dataCount or profIndex.tagCount,
    138 // assuming that they are no more than 2^29 apart (guaranteed since they are never more than
    139 // len(data) or len(tags) apart, respectively).
    140 // tagCount wraps at 2^30, while dataCount wraps at 2^32.
    141 // This function works for both.
    142 func countSub(x, y uint32) int {
    143 	// x-y is 32-bit signed or 30-bit signed; sign-extend to 32 bits and convert to int.
    144 	return int(int32(x-y) << 2 >> 2)
    145 }
    146 
    147 // addCountsAndClearFlags returns the packed form of "x + (data, tag) - all flags".
    148 func (x profIndex) addCountsAndClearFlags(data, tag int) profIndex {
    149 	return profIndex((uint64(x)>>34+uint64(uint32(tag)<<2>>2))<<34 | uint64(uint32(x)+uint32(data)))
    150 }
    151 
    152 // hasOverflow reports whether b has any overflow records pending.
    153 func (b *profBuf) hasOverflow() bool {
    154 	return uint32(atomic.Load64(&b.overflow)) > 0
    155 }
    156 
    157 // takeOverflow consumes the pending overflow records, returning the overflow count
    158 // and the time of the first overflow.
    159 // When called by the reader, it is racing against incrementOverflow.
    160 func (b *profBuf) takeOverflow() (count uint32, time uint64) {
    161 	overflow := atomic.Load64(&b.overflow)
    162 	time = atomic.Load64(&b.overflowTime)
    163 	for {
    164 		count = uint32(overflow)
    165 		if count == 0 {
    166 			time = 0
    167 			break
    168 		}
    169 		// Increment generation, clear overflow count in low bits.
    170 		if atomic.Cas64(&b.overflow, overflow, ((overflow>>32)+1)<<32) {
    171 			break
    172 		}
    173 		overflow = atomic.Load64(&b.overflow)
    174 		time = atomic.Load64(&b.overflowTime)
    175 	}
    176 	return uint32(overflow), time
    177 }
    178 
    179 // incrementOverflow records a single overflow at time now.
    180 // It is racing against a possible takeOverflow in the reader.
    181 func (b *profBuf) incrementOverflow(now int64) {
    182 	for {
    183 		overflow := atomic.Load64(&b.overflow)
    184 
    185 		// Once we see b.overflow reach 0, it's stable: no one else is changing it underfoot.
    186 		// We need to set overflowTime if we're incrementing b.overflow from 0.
    187 		if uint32(overflow) == 0 {
    188 			// Store overflowTime first so it's always available when overflow != 0.
    189 			atomic.Store64(&b.overflowTime, uint64(now))
    190 			atomic.Store64(&b.overflow, (((overflow>>32)+1)<<32)+1)
    191 			break
    192 		}
    193 		// Otherwise we're racing to increment against reader
    194 		// who wants to set b.overflow to 0.
    195 		// Out of paranoia, leave 2-1 a sticky overflow value,
    196 		// to avoid wrapping around. Extremely unlikely.
    197 		if int32(overflow) == -1 {
    198 			break
    199 		}
    200 		if atomic.Cas64(&b.overflow, overflow, overflow+1) {
    201 			break
    202 		}
    203 	}
    204 }
    205 
    206 // newProfBuf returns a new profiling buffer with room for
    207 // a header of hdrsize words and a buffer of at least bufwords words.
    208 func newProfBuf(hdrsize, bufwords, tags int) *profBuf {
    209 	if min := 2 + hdrsize + 1; bufwords < min {
    210 		bufwords = min
    211 	}
    212 
    213 	// Buffer sizes must be power of two, so that we don't have to
    214 	// worry about uint32 wraparound changing the effective position
    215 	// within the buffers. We store 30 bits of count; limiting to 28
    216 	// gives us some room for intermediate calculations.
    217 	if bufwords >= 1<<28 || tags >= 1<<28 {
    218 		throw("newProfBuf: buffer too large")
    219 	}
    220 	var i int
    221 	for i = 1; i < bufwords; i <<= 1 {
    222 	}
    223 	bufwords = i
    224 	for i = 1; i < tags; i <<= 1 {
    225 	}
    226 	tags = i
    227 
    228 	b := new(profBuf)
    229 	b.hdrsize = uintptr(hdrsize)
    230 	b.data = make([]uint64, bufwords)
    231 	b.tags = make([]unsafe.Pointer, tags)
    232 	b.overflowBuf = make([]uint64, 2+b.hdrsize+1)
    233 	return b
    234 }
    235 
    236 // canWriteRecord reports whether the buffer has room
    237 // for a single contiguous record with a stack of length nstk.
    238 func (b *profBuf) canWriteRecord(nstk int) bool {
    239 	br := b.r.load()
    240 	bw := b.w.load()
    241 
    242 	// room for tag?
    243 	if countSub(br.tagCount(), bw.tagCount())+len(b.tags) < 1 {
    244 		return false
    245 	}
    246 
    247 	// room for data?
    248 	nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
    249 	want := 2 + int(b.hdrsize) + nstk
    250 	i := int(bw.dataCount() % uint32(len(b.data)))
    251 	if i+want > len(b.data) {
    252 		// Can't fit in trailing fragment of slice.
    253 		// Skip over that and start over at beginning of slice.
    254 		nd -= len(b.data) - i
    255 	}
    256 	return nd >= want
    257 }
    258 
    259 // canWriteTwoRecords reports whether the buffer has room
    260 // for two records with stack lengths nstk1, nstk2, in that order.
    261 // Each record must be contiguous on its own, but the two
    262 // records need not be contiguous (one can be at the end of the buffer
    263 // and the other can wrap around and start at the beginning of the buffer).
    264 func (b *profBuf) canWriteTwoRecords(nstk1, nstk2 int) bool {
    265 	br := b.r.load()
    266 	bw := b.w.load()
    267 
    268 	// room for tag?
    269 	if countSub(br.tagCount(), bw.tagCount())+len(b.tags) < 2 {
    270 		return false
    271 	}
    272 
    273 	// room for data?
    274 	nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
    275 
    276 	// first record
    277 	want := 2 + int(b.hdrsize) + nstk1
    278 	i := int(bw.dataCount() % uint32(len(b.data)))
    279 	if i+want > len(b.data) {
    280 		// Can't fit in trailing fragment of slice.
    281 		// Skip over that and start over at beginning of slice.
    282 		nd -= len(b.data) - i
    283 		i = 0
    284 	}
    285 	i += want
    286 	nd -= want
    287 
    288 	// second record
    289 	want = 2 + int(b.hdrsize) + nstk2
    290 	if i+want > len(b.data) {
    291 		// Can't fit in trailing fragment of slice.
    292 		// Skip over that and start over at beginning of slice.
    293 		nd -= len(b.data) - i
    294 		i = 0
    295 	}
    296 	return nd >= want
    297 }
    298 
    299 // write writes an entry to the profiling buffer b.
    300 // The entry begins with a fixed hdr, which must have
    301 // length b.hdrsize, followed by a variable-sized stack
    302 // and a single tag pointer *tagPtr (or nil if tagPtr is nil).
    303 // No write barriers allowed because this might be called from a signal handler.
    304 func (b *profBuf) write(tagPtr *unsafe.Pointer, now int64, hdr []uint64, stk []uintptr) {
    305 	if b == nil {
    306 		return
    307 	}
    308 	if len(hdr) > int(b.hdrsize) {
    309 		throw("misuse of profBuf.write")
    310 	}
    311 
    312 	if hasOverflow := b.hasOverflow(); hasOverflow && b.canWriteTwoRecords(1, len(stk)) {
    313 		// Room for both an overflow record and the one being written.
    314 		// Write the overflow record if the reader hasn't gotten to it yet.
    315 		// Only racing against reader, not other writers.
    316 		count, time := b.takeOverflow()
    317 		if count > 0 {
    318 			var stk [1]uintptr
    319 			stk[0] = uintptr(count)
    320 			b.write(nil, int64(time), nil, stk[:])
    321 		}
    322 	} else if hasOverflow || !b.canWriteRecord(len(stk)) {
    323 		// Pending overflow without room to write overflow and new records
    324 		// or no overflow but also no room for new record.
    325 		b.incrementOverflow(now)
    326 		b.wakeupExtra()
    327 		return
    328 	}
    329 
    330 	// There's room: write the record.
    331 	br := b.r.load()
    332 	bw := b.w.load()
    333 
    334 	// Profiling tag
    335 	//
    336 	// The tag is a pointer, but we can't run a write barrier here.
    337 	// We have interrupted the OS-level execution of gp, but the
    338 	// runtime still sees gp as executing. In effect, we are running
    339 	// in place of the real gp. Since gp is the only goroutine that
    340 	// can overwrite gp.labels, the value of gp.labels is stable during
    341 	// this signal handler: it will still be reachable from gp when
    342 	// we finish executing. If a GC is in progress right now, it must
    343 	// keep gp.labels alive, because gp.labels is reachable from gp.
    344 	// If gp were to overwrite gp.labels, the deletion barrier would
    345 	// still shade that pointer, which would preserve it for the
    346 	// in-progress GC, so all is well. Any future GC will see the
    347 	// value we copied when scanning b.tags (heap-allocated).
    348 	// We arrange that the store here is always overwriting a nil,
    349 	// so there is no need for a deletion barrier on b.tags[wt].
    350 	wt := int(bw.tagCount() % uint32(len(b.tags)))
    351 	if tagPtr != nil {
    352 		*(*uintptr)(unsafe.Pointer(&b.tags[wt])) = uintptr(unsafe.Pointer(*tagPtr))
    353 	}
    354 
    355 	// Main record.
    356 	// It has to fit in a contiguous section of the slice, so if it doesn't fit at the end,
    357 	// leave a rewind marker (0) and start over at the beginning of the slice.
    358 	wd := int(bw.dataCount() % uint32(len(b.data)))
    359 	nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
    360 	skip := 0
    361 	if wd+2+int(b.hdrsize)+len(stk) > len(b.data) {
    362 		b.data[wd] = 0
    363 		skip = len(b.data) - wd
    364 		nd -= skip
    365 		wd = 0
    366 	}
    367 	data := b.data[wd:]
    368 	data[0] = uint64(2 + b.hdrsize + uintptr(len(stk))) // length
    369 	data[1] = uint64(now)                               // time stamp
    370 	// header, zero-padded
    371 	i := uintptr(copy(data[2:2+b.hdrsize], hdr))
    372 	for ; i < b.hdrsize; i++ {
    373 		data[2+i] = 0
    374 	}
    375 	for i, pc := range stk {
    376 		data[2+b.hdrsize+uintptr(i)] = uint64(pc)
    377 	}
    378 
    379 	for {
    380 		// Commit write.
    381 		// Racing with reader setting flag bits in b.w, to avoid lost wakeups.
    382 		old := b.w.load()
    383 		new := old.addCountsAndClearFlags(skip+2+len(stk)+int(b.hdrsize), 1)
    384 		if !b.w.cas(old, new) {
    385 			continue
    386 		}
    387 		// If there was a reader, wake it up.
    388 		if old&profReaderSleeping != 0 {
    389 			notewakeup(&b.wait)
    390 		}
    391 		break
    392 	}
    393 }
    394 
    395 // close signals that there will be no more writes on the buffer.
    396 // Once all the data has been read from the buffer, reads will return eof=true.
    397 func (b *profBuf) close() {
    398 	if atomic.Load(&b.eof) > 0 {
    399 		throw("runtime: profBuf already closed")
    400 	}
    401 	atomic.Store(&b.eof, 1)
    402 	b.wakeupExtra()
    403 }
    404 
    405 // wakeupExtra must be called after setting one of the "extra"
    406 // atomic fields b.overflow or b.eof.
    407 // It records the change in b.w and wakes up the reader if needed.
    408 func (b *profBuf) wakeupExtra() {
    409 	for {
    410 		old := b.w.load()
    411 		new := old | profWriteExtra
    412 		if !b.w.cas(old, new) {
    413 			continue
    414 		}
    415 		if old&profReaderSleeping != 0 {
    416 			notewakeup(&b.wait)
    417 		}
    418 		break
    419 	}
    420 }
    421 
    422 // profBufReadMode specifies whether to block when no data is available to read.
    423 type profBufReadMode int
    424 
    425 const (
    426 	profBufBlocking profBufReadMode = iota
    427 	profBufNonBlocking
    428 )
    429 
    430 var overflowTag [1]unsafe.Pointer // always nil
    431 
    432 func (b *profBuf) read(mode profBufReadMode) (data []uint64, tags []unsafe.Pointer, eof bool) {
    433 	if b == nil {
    434 		return nil, nil, true
    435 	}
    436 
    437 	br := b.rNext
    438 
    439 	// Commit previous read, returning that part of the ring to the writer.
    440 	// First clear tags that have now been read, both to avoid holding
    441 	// up the memory they point at for longer than necessary
    442 	// and so that b.write can assume it is always overwriting
    443 	// nil tag entries (see comment in b.write).
    444 	rPrev := b.r.load()
    445 	if rPrev != br {
    446 		ntag := countSub(br.tagCount(), rPrev.tagCount())
    447 		ti := int(rPrev.tagCount() % uint32(len(b.tags)))
    448 		for i := 0; i < ntag; i++ {
    449 			b.tags[ti] = nil
    450 			if ti++; ti == len(b.tags) {
    451 				ti = 0
    452 			}
    453 		}
    454 		b.r.store(br)
    455 	}
    456 
    457 Read:
    458 	bw := b.w.load()
    459 	numData := countSub(bw.dataCount(), br.dataCount())
    460 	if numData == 0 {
    461 		if b.hasOverflow() {
    462 			// No data to read, but there is overflow to report.
    463 			// Racing with writer flushing b.overflow into a real record.
    464 			count, time := b.takeOverflow()
    465 			if count == 0 {
    466 				// Lost the race, go around again.
    467 				goto Read
    468 			}
    469 			// Won the race, report overflow.
    470 			dst := b.overflowBuf
    471 			dst[0] = uint64(2 + b.hdrsize + 1)
    472 			dst[1] = uint64(time)
    473 			for i := uintptr(0); i < b.hdrsize; i++ {
    474 				dst[2+i] = 0
    475 			}
    476 			dst[2+b.hdrsize] = uint64(count)
    477 			return dst[:2+b.hdrsize+1], overflowTag[:1], false
    478 		}
    479 		if atomic.Load(&b.eof) > 0 {
    480 			// No data, no overflow, EOF set: done.
    481 			return nil, nil, true
    482 		}
    483 		if bw&profWriteExtra != 0 {
    484 			// Writer claims to have published extra information (overflow or eof).
    485 			// Attempt to clear notification and then check again.
    486 			// If we fail to clear the notification it means b.w changed,
    487 			// so we still need to check again.
    488 			b.w.cas(bw, bw&^profWriteExtra)
    489 			goto Read
    490 		}
    491 
    492 		// Nothing to read right now.
    493 		// Return or sleep according to mode.
    494 		if mode == profBufNonBlocking {
    495 			return nil, nil, false
    496 		}
    497 		if !b.w.cas(bw, bw|profReaderSleeping) {
    498 			goto Read
    499 		}
    500 		// Committed to sleeping.
    501 		notetsleepg(&b.wait, -1)
    502 		noteclear(&b.wait)
    503 		goto Read
    504 	}
    505 	data = b.data[br.dataCount()%uint32(len(b.data)):]
    506 	if len(data) > numData {
    507 		data = data[:numData]
    508 	} else {
    509 		numData -= len(data) // available in case of wraparound
    510 	}
    511 	skip := 0
    512 	if data[0] == 0 {
    513 		// Wraparound record. Go back to the beginning of the ring.
    514 		skip = len(data)
    515 		data = b.data
    516 		if len(data) > numData {
    517 			data = data[:numData]
    518 		}
    519 	}
    520 
    521 	ntag := countSub(bw.tagCount(), br.tagCount())
    522 	if ntag == 0 {
    523 		throw("runtime: malformed profBuf buffer - tag and data out of sync")
    524 	}
    525 	tags = b.tags[br.tagCount()%uint32(len(b.tags)):]
    526 	if len(tags) > ntag {
    527 		tags = tags[:ntag]
    528 	}
    529 
    530 	// Count out whole data records until either data or tags is done.
    531 	// They are always in sync in the buffer, but due to an end-of-slice
    532 	// wraparound we might need to stop early and return the rest
    533 	// in the next call.
    534 	di := 0
    535 	ti := 0
    536 	for di < len(data) && data[di] != 0 && ti < len(tags) {
    537 		if uintptr(di)+uintptr(data[di]) > uintptr(len(data)) {
    538 			throw("runtime: malformed profBuf buffer - invalid size")
    539 		}
    540 		di += int(data[di])
    541 		ti++
    542 	}
    543 
    544 	// Remember how much we returned, to commit read on next call.
    545 	b.rNext = br.addCountsAndClearFlags(skip+di, ti)
    546 
    547 	if raceenabled {
    548 		// Match racereleasemerge in runtime_setProfLabel,
    549 		// so that the setting of the labels in runtime_setProfLabel
    550 		// is treated as happening before any use of the labels
    551 		// by our caller. The synchronization on labelSync itself is a fiction
    552 		// for the race detector. The actual synchronization is handled
    553 		// by the fact that the signal handler only reads from the current
    554 		// goroutine and uses atomics to write the updated queue indices,
    555 		// and then the read-out from the signal handler buffer uses
    556 		// atomics to read those queue indices.
    557 		raceacquire(unsafe.Pointer(&labelSync))
    558 	}
    559 
    560 	return data[:di], tags[:ti], false
    561 }
    562