Home | History | Annotate | Download | only in runtime
      1 // Copyright 2009 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 // Semaphore implementation exposed to Go.
      6 // Intended use is provide a sleep and wakeup
      7 // primitive that can be used in the contended case
      8 // of other synchronization primitives.
      9 // Thus it targets the same goal as Linux's futex,
     10 // but it has much simpler semantics.
     11 //
     12 // That is, don't think of these as semaphores.
     13 // Think of them as a way to implement sleep and wakeup
     14 // such that every sleep is paired with a single wakeup,
     15 // even if, due to races, the wakeup happens before the sleep.
     16 //
     17 // See Mullender and Cox, ``Semaphores in Plan 9,''
     18 // http://swtch.com/semaphore.pdf
     19 
     20 package runtime
     21 
     22 import (
     23 	"runtime/internal/atomic"
     24 	"runtime/internal/sys"
     25 	"unsafe"
     26 )
     27 
     28 // Asynchronous semaphore for sync.Mutex.
     29 
     30 // A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
     31 // Each of those sudog may in turn point (through s.waitlink) to a list
     32 // of other sudogs waiting on the same address.
     33 // The operations on the inner lists of sudogs with the same address
     34 // are all O(1). The scanning of the top-level semaRoot list is O(log n),
     35 // where n is the number of distinct addresses with goroutines blocked
     36 // on them that hash to the given semaRoot.
     37 // See golang.org/issue/17953 for a program that worked badly
     38 // before we introduced the second level of list, and test/locklinear.go
     39 // for a test that exercises this.
     40 type semaRoot struct {
     41 	lock  mutex
     42 	treap *sudog // root of balanced tree of unique waiters.
     43 	nwait uint32 // Number of waiters. Read w/o the lock.
     44 }
     45 
     46 // Prime to not correlate with any user patterns.
     47 const semTabSize = 251
     48 
     49 var semtable [semTabSize]struct {
     50 	root semaRoot
     51 	pad  [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
     52 }
     53 
     54 //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
     55 func sync_runtime_Semacquire(addr *uint32) {
     56 	semacquire1(addr, false, semaBlockProfile)
     57 }
     58 
     59 //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
     60 func poll_runtime_Semacquire(addr *uint32) {
     61 	semacquire1(addr, false, semaBlockProfile)
     62 }
     63 
     64 //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
     65 func sync_runtime_Semrelease(addr *uint32, handoff bool) {
     66 	semrelease1(addr, handoff)
     67 }
     68 
     69 //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
     70 func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
     71 	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
     72 }
     73 
     74 //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
     75 func poll_runtime_Semrelease(addr *uint32) {
     76 	semrelease(addr)
     77 }
     78 
     79 func readyWithTime(s *sudog, traceskip int) {
     80 	if s.releasetime != 0 {
     81 		s.releasetime = cputicks()
     82 	}
     83 	goready(s.g, traceskip)
     84 }
     85 
     86 type semaProfileFlags int
     87 
     88 const (
     89 	semaBlockProfile semaProfileFlags = 1 << iota
     90 	semaMutexProfile
     91 )
     92 
     93 // Called from runtime.
     94 func semacquire(addr *uint32) {
     95 	semacquire1(addr, false, 0)
     96 }
     97 
     98 func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
     99 	gp := getg()
    100 	if gp != gp.m.curg {
    101 		throw("semacquire not on the G stack")
    102 	}
    103 
    104 	// Easy case.
    105 	if cansemacquire(addr) {
    106 		return
    107 	}
    108 
    109 	// Harder case:
    110 	//	increment waiter count
    111 	//	try cansemacquire one more time, return if succeeded
    112 	//	enqueue itself as a waiter
    113 	//	sleep
    114 	//	(waiter descriptor is dequeued by signaler)
    115 	s := acquireSudog()
    116 	root := semroot(addr)
    117 	t0 := int64(0)
    118 	s.releasetime = 0
    119 	s.acquiretime = 0
    120 	s.ticket = 0
    121 	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
    122 		t0 = cputicks()
    123 		s.releasetime = -1
    124 	}
    125 	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
    126 		if t0 == 0 {
    127 			t0 = cputicks()
    128 		}
    129 		s.acquiretime = t0
    130 	}
    131 	for {
    132 		lock(&root.lock)
    133 		// Add ourselves to nwait to disable "easy case" in semrelease.
    134 		atomic.Xadd(&root.nwait, 1)
    135 		// Check cansemacquire to avoid missed wakeup.
    136 		if cansemacquire(addr) {
    137 			atomic.Xadd(&root.nwait, -1)
    138 			unlock(&root.lock)
    139 			break
    140 		}
    141 		// Any semrelease after the cansemacquire knows we're waiting
    142 		// (we set nwait above), so go to sleep.
    143 		root.queue(addr, s, lifo)
    144 		goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
    145 		if s.ticket != 0 || cansemacquire(addr) {
    146 			break
    147 		}
    148 	}
    149 	if s.releasetime > 0 {
    150 		blockevent(s.releasetime-t0, 3)
    151 	}
    152 	releaseSudog(s)
    153 }
    154 
    155 func semrelease(addr *uint32) {
    156 	semrelease1(addr, false)
    157 }
    158 
    159 func semrelease1(addr *uint32, handoff bool) {
    160 	root := semroot(addr)
    161 	atomic.Xadd(addr, 1)
    162 
    163 	// Easy case: no waiters?
    164 	// This check must happen after the xadd, to avoid a missed wakeup
    165 	// (see loop in semacquire).
    166 	if atomic.Load(&root.nwait) == 0 {
    167 		return
    168 	}
    169 
    170 	// Harder case: search for a waiter and wake it.
    171 	lock(&root.lock)
    172 	if atomic.Load(&root.nwait) == 0 {
    173 		// The count is already consumed by another goroutine,
    174 		// so no need to wake up another goroutine.
    175 		unlock(&root.lock)
    176 		return
    177 	}
    178 	s, t0 := root.dequeue(addr)
    179 	if s != nil {
    180 		atomic.Xadd(&root.nwait, -1)
    181 	}
    182 	unlock(&root.lock)
    183 	if s != nil { // May be slow, so unlock first
    184 		acquiretime := s.acquiretime
    185 		if acquiretime != 0 {
    186 			mutexevent(t0-acquiretime, 3)
    187 		}
    188 		if s.ticket != 0 {
    189 			throw("corrupted semaphore ticket")
    190 		}
    191 		if handoff && cansemacquire(addr) {
    192 			s.ticket = 1
    193 		}
    194 		readyWithTime(s, 5)
    195 	}
    196 }
    197 
    198 func semroot(addr *uint32) *semaRoot {
    199 	return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
    200 }
    201 
    202 func cansemacquire(addr *uint32) bool {
    203 	for {
    204 		v := atomic.Load(addr)
    205 		if v == 0 {
    206 			return false
    207 		}
    208 		if atomic.Cas(addr, v, v-1) {
    209 			return true
    210 		}
    211 	}
    212 }
    213 
    214 // queue adds s to the blocked goroutines in semaRoot.
    215 func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
    216 	s.g = getg()
    217 	s.elem = unsafe.Pointer(addr)
    218 	s.next = nil
    219 	s.prev = nil
    220 
    221 	var last *sudog
    222 	pt := &root.treap
    223 	for t := *pt; t != nil; t = *pt {
    224 		if t.elem == unsafe.Pointer(addr) {
    225 			// Already have addr in list.
    226 			if lifo {
    227 				// Substitute s in t's place in treap.
    228 				*pt = s
    229 				s.ticket = t.ticket
    230 				s.acquiretime = t.acquiretime
    231 				s.parent = t.parent
    232 				s.prev = t.prev
    233 				s.next = t.next
    234 				if s.prev != nil {
    235 					s.prev.parent = s
    236 				}
    237 				if s.next != nil {
    238 					s.next.parent = s
    239 				}
    240 				// Add t first in s's wait list.
    241 				s.waitlink = t
    242 				s.waittail = t.waittail
    243 				if s.waittail == nil {
    244 					s.waittail = t
    245 				}
    246 				t.parent = nil
    247 				t.prev = nil
    248 				t.next = nil
    249 				t.waittail = nil
    250 			} else {
    251 				// Add s to end of t's wait list.
    252 				if t.waittail == nil {
    253 					t.waitlink = s
    254 				} else {
    255 					t.waittail.waitlink = s
    256 				}
    257 				t.waittail = s
    258 				s.waitlink = nil
    259 			}
    260 			return
    261 		}
    262 		last = t
    263 		if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
    264 			pt = &t.prev
    265 		} else {
    266 			pt = &t.next
    267 		}
    268 	}
    269 
    270 	// Add s as new leaf in tree of unique addrs.
    271 	// The balanced tree is a treap using ticket as the random heap priority.
    272 	// That is, it is a binary tree ordered according to the elem addresses,
    273 	// but then among the space of possible binary trees respecting those
    274 	// addresses, it is kept balanced on average by maintaining a heap ordering
    275 	// on the ticket: s.ticket <= both s.prev.ticket and s.next.ticket.
    276 	// https://en.wikipedia.org/wiki/Treap
    277 	// http://faculty.washington.edu/aragon/pubs/rst89.pdf
    278 	//
    279 	// s.ticket compared with zero in couple of places, therefore set lowest bit.
    280 	// It will not affect treap's quality noticeably.
    281 	s.ticket = fastrand() | 1
    282 	s.parent = last
    283 	*pt = s
    284 
    285 	// Rotate up into tree according to ticket (priority).
    286 	for s.parent != nil && s.parent.ticket > s.ticket {
    287 		if s.parent.prev == s {
    288 			root.rotateRight(s.parent)
    289 		} else {
    290 			if s.parent.next != s {
    291 				panic("semaRoot queue")
    292 			}
    293 			root.rotateLeft(s.parent)
    294 		}
    295 	}
    296 }
    297 
    298 // dequeue searches for and finds the first goroutine
    299 // in semaRoot blocked on addr.
    300 // If the sudog was being profiled, dequeue returns the time
    301 // at which it was woken up as now. Otherwise now is 0.
    302 func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
    303 	ps := &root.treap
    304 	s := *ps
    305 	for ; s != nil; s = *ps {
    306 		if s.elem == unsafe.Pointer(addr) {
    307 			goto Found
    308 		}
    309 		if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
    310 			ps = &s.prev
    311 		} else {
    312 			ps = &s.next
    313 		}
    314 	}
    315 	return nil, 0
    316 
    317 Found:
    318 	now = int64(0)
    319 	if s.acquiretime != 0 {
    320 		now = cputicks()
    321 	}
    322 	if t := s.waitlink; t != nil {
    323 		// Substitute t, also waiting on addr, for s in root tree of unique addrs.
    324 		*ps = t
    325 		t.ticket = s.ticket
    326 		t.parent = s.parent
    327 		t.prev = s.prev
    328 		if t.prev != nil {
    329 			t.prev.parent = t
    330 		}
    331 		t.next = s.next
    332 		if t.next != nil {
    333 			t.next.parent = t
    334 		}
    335 		if t.waitlink != nil {
    336 			t.waittail = s.waittail
    337 		} else {
    338 			t.waittail = nil
    339 		}
    340 		t.acquiretime = now
    341 		s.waitlink = nil
    342 		s.waittail = nil
    343 	} else {
    344 		// Rotate s down to be leaf of tree for removal, respecting priorities.
    345 		for s.next != nil || s.prev != nil {
    346 			if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
    347 				root.rotateRight(s)
    348 			} else {
    349 				root.rotateLeft(s)
    350 			}
    351 		}
    352 		// Remove s, now a leaf.
    353 		if s.parent != nil {
    354 			if s.parent.prev == s {
    355 				s.parent.prev = nil
    356 			} else {
    357 				s.parent.next = nil
    358 			}
    359 		} else {
    360 			root.treap = nil
    361 		}
    362 	}
    363 	s.parent = nil
    364 	s.elem = nil
    365 	s.next = nil
    366 	s.prev = nil
    367 	s.ticket = 0
    368 	return s, now
    369 }
    370 
    371 // rotateLeft rotates the tree rooted at node x.
    372 // turning (x a (y b c)) into (y (x a b) c).
    373 func (root *semaRoot) rotateLeft(x *sudog) {
    374 	// p -> (x a (y b c))
    375 	p := x.parent
    376 	a, y := x.prev, x.next
    377 	b, c := y.prev, y.next
    378 
    379 	y.prev = x
    380 	x.parent = y
    381 	y.next = c
    382 	if c != nil {
    383 		c.parent = y
    384 	}
    385 	x.prev = a
    386 	if a != nil {
    387 		a.parent = x
    388 	}
    389 	x.next = b
    390 	if b != nil {
    391 		b.parent = x
    392 	}
    393 
    394 	y.parent = p
    395 	if p == nil {
    396 		root.treap = y
    397 	} else if p.prev == x {
    398 		p.prev = y
    399 	} else {
    400 		if p.next != x {
    401 			throw("semaRoot rotateLeft")
    402 		}
    403 		p.next = y
    404 	}
    405 }
    406 
    407 // rotateRight rotates the tree rooted at node y.
    408 // turning (y (x a b) c) into (x a (y b c)).
    409 func (root *semaRoot) rotateRight(y *sudog) {
    410 	// p -> (y (x a b) c)
    411 	p := y.parent
    412 	x, c := y.prev, y.next
    413 	a, b := x.prev, x.next
    414 
    415 	x.prev = a
    416 	if a != nil {
    417 		a.parent = x
    418 	}
    419 	x.next = y
    420 	y.parent = x
    421 	y.prev = b
    422 	if b != nil {
    423 		b.parent = y
    424 	}
    425 	y.next = c
    426 	if c != nil {
    427 		c.parent = y
    428 	}
    429 
    430 	x.parent = p
    431 	if p == nil {
    432 		root.treap = x
    433 	} else if p.prev == y {
    434 		p.prev = x
    435 	} else {
    436 		if p.next != y {
    437 			throw("semaRoot rotateRight")
    438 		}
    439 		p.next = x
    440 	}
    441 }
    442 
    443 // notifyList is a ticket-based notification list used to implement sync.Cond.
    444 //
    445 // It must be kept in sync with the sync package.
    446 type notifyList struct {
    447 	// wait is the ticket number of the next waiter. It is atomically
    448 	// incremented outside the lock.
    449 	wait uint32
    450 
    451 	// notify is the ticket number of the next waiter to be notified. It can
    452 	// be read outside the lock, but is only written to with lock held.
    453 	//
    454 	// Both wait & notify can wrap around, and such cases will be correctly
    455 	// handled as long as their "unwrapped" difference is bounded by 2^31.
    456 	// For this not to be the case, we'd need to have 2^31+ goroutines
    457 	// blocked on the same condvar, which is currently not possible.
    458 	notify uint32
    459 
    460 	// List of parked waiters.
    461 	lock mutex
    462 	head *sudog
    463 	tail *sudog
    464 }
    465 
    466 // less checks if a < b, considering a & b running counts that may overflow the
    467 // 32-bit range, and that their "unwrapped" difference is always less than 2^31.
    468 func less(a, b uint32) bool {
    469 	return int32(a-b) < 0
    470 }
    471 
    472 // notifyListAdd adds the caller to a notify list such that it can receive
    473 // notifications. The caller must eventually call notifyListWait to wait for
    474 // such a notification, passing the returned ticket number.
    475 //go:linkname notifyListAdd sync.runtime_notifyListAdd
    476 func notifyListAdd(l *notifyList) uint32 {
    477 	// This may be called concurrently, for example, when called from
    478 	// sync.Cond.Wait while holding a RWMutex in read mode.
    479 	return atomic.Xadd(&l.wait, 1) - 1
    480 }
    481 
    482 // notifyListWait waits for a notification. If one has been sent since
    483 // notifyListAdd was called, it returns immediately. Otherwise, it blocks.
    484 //go:linkname notifyListWait sync.runtime_notifyListWait
    485 func notifyListWait(l *notifyList, t uint32) {
    486 	lock(&l.lock)
    487 
    488 	// Return right away if this ticket has already been notified.
    489 	if less(t, l.notify) {
    490 		unlock(&l.lock)
    491 		return
    492 	}
    493 
    494 	// Enqueue itself.
    495 	s := acquireSudog()
    496 	s.g = getg()
    497 	s.ticket = t
    498 	s.releasetime = 0
    499 	t0 := int64(0)
    500 	if blockprofilerate > 0 {
    501 		t0 = cputicks()
    502 		s.releasetime = -1
    503 	}
    504 	if l.tail == nil {
    505 		l.head = s
    506 	} else {
    507 		l.tail.next = s
    508 	}
    509 	l.tail = s
    510 	goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
    511 	if t0 != 0 {
    512 		blockevent(s.releasetime-t0, 2)
    513 	}
    514 	releaseSudog(s)
    515 }
    516 
    517 // notifyListNotifyAll notifies all entries in the list.
    518 //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
    519 func notifyListNotifyAll(l *notifyList) {
    520 	// Fast-path: if there are no new waiters since the last notification
    521 	// we don't need to acquire the lock.
    522 	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
    523 		return
    524 	}
    525 
    526 	// Pull the list out into a local variable, waiters will be readied
    527 	// outside the lock.
    528 	lock(&l.lock)
    529 	s := l.head
    530 	l.head = nil
    531 	l.tail = nil
    532 
    533 	// Update the next ticket to be notified. We can set it to the current
    534 	// value of wait because any previous waiters are already in the list
    535 	// or will notice that they have already been notified when trying to
    536 	// add themselves to the list.
    537 	atomic.Store(&l.notify, atomic.Load(&l.wait))
    538 	unlock(&l.lock)
    539 
    540 	// Go through the local list and ready all waiters.
    541 	for s != nil {
    542 		next := s.next
    543 		s.next = nil
    544 		readyWithTime(s, 4)
    545 		s = next
    546 	}
    547 }
    548 
    549 // notifyListNotifyOne notifies one entry in the list.
    550 //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
    551 func notifyListNotifyOne(l *notifyList) {
    552 	// Fast-path: if there are no new waiters since the last notification
    553 	// we don't need to acquire the lock at all.
    554 	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
    555 		return
    556 	}
    557 
    558 	lock(&l.lock)
    559 
    560 	// Re-check under the lock if we need to do anything.
    561 	t := l.notify
    562 	if t == atomic.Load(&l.wait) {
    563 		unlock(&l.lock)
    564 		return
    565 	}
    566 
    567 	// Update the next notify ticket number.
    568 	atomic.Store(&l.notify, t+1)
    569 
    570 	// Try to find the g that needs to be notified.
    571 	// If it hasn't made it to the list yet we won't find it,
    572 	// but it won't park itself once it sees the new notify number.
    573 	//
    574 	// This scan looks linear but essentially always stops quickly.
    575 	// Because g's queue separately from taking numbers,
    576 	// there may be minor reorderings in the list, but we
    577 	// expect the g we're looking for to be near the front.
    578 	// The g has others in front of it on the list only to the
    579 	// extent that it lost the race, so the iteration will not
    580 	// be too long. This applies even when the g is missing:
    581 	// it hasn't yet gotten to sleep and has lost the race to
    582 	// the (few) other g's that we find on the list.
    583 	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
    584 		if s.ticket == t {
    585 			n := s.next
    586 			if p != nil {
    587 				p.next = n
    588 			} else {
    589 				l.head = n
    590 			}
    591 			if n == nil {
    592 				l.tail = p
    593 			}
    594 			unlock(&l.lock)
    595 			s.next = nil
    596 			readyWithTime(s, 4)
    597 			return
    598 		}
    599 	}
    600 	unlock(&l.lock)
    601 }
    602 
    603 //go:linkname notifyListCheck sync.runtime_notifyListCheck
    604 func notifyListCheck(sz uintptr) {
    605 	if sz != unsafe.Sizeof(notifyList{}) {
    606 		print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
    607 		throw("bad notifyList size")
    608 	}
    609 }
    610 
    611 //go:linkname sync_nanotime sync.runtime_nanotime
    612 func sync_nanotime() int64 {
    613 	return nanotime()
    614 }
    615