Home | History | Annotate | Download | only in runtime
      1 // Copyright 2009 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 // Semaphore implementation exposed to Go.
      6 // Intended use is provide a sleep and wakeup
      7 // primitive that can be used in the contended case
      8 // of other synchronization primitives.
      9 // Thus it targets the same goal as Linux's futex,
     10 // but it has much simpler semantics.
     11 //
     12 // That is, don't think of these as semaphores.
     13 // Think of them as a way to implement sleep and wakeup
     14 // such that every sleep is paired with a single wakeup,
     15 // even if, due to races, the wakeup happens before the sleep.
     16 //
     17 // See Mullender and Cox, ``Semaphores in Plan 9,''
     18 // http://swtch.com/semaphore.pdf
     19 
     20 package runtime
     21 
     22 import (
     23 	"runtime/internal/atomic"
     24 	"runtime/internal/sys"
     25 	"unsafe"
     26 )
     27 
     28 // Asynchronous semaphore for sync.Mutex.
     29 
     30 type semaRoot struct {
     31 	lock  mutex
     32 	head  *sudog
     33 	tail  *sudog
     34 	nwait uint32 // Number of waiters. Read w/o the lock.
     35 }
     36 
     37 // Prime to not correlate with any user patterns.
     38 const semTabSize = 251
     39 
     40 var semtable [semTabSize]struct {
     41 	root semaRoot
     42 	pad  [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
     43 }
     44 
     45 //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
     46 func sync_runtime_Semacquire(addr *uint32) {
     47 	semacquire(addr, semaBlockProfile)
     48 }
     49 
     50 //go:linkname net_runtime_Semacquire net.runtime_Semacquire
     51 func net_runtime_Semacquire(addr *uint32) {
     52 	semacquire(addr, semaBlockProfile)
     53 }
     54 
     55 //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
     56 func sync_runtime_Semrelease(addr *uint32) {
     57 	semrelease(addr)
     58 }
     59 
     60 //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
     61 func sync_runtime_SemacquireMutex(addr *uint32) {
     62 	semacquire(addr, semaBlockProfile|semaMutexProfile)
     63 }
     64 
     65 //go:linkname net_runtime_Semrelease net.runtime_Semrelease
     66 func net_runtime_Semrelease(addr *uint32) {
     67 	semrelease(addr)
     68 }
     69 
     70 func readyWithTime(s *sudog, traceskip int) {
     71 	if s.releasetime != 0 {
     72 		s.releasetime = cputicks()
     73 	}
     74 	goready(s.g, traceskip)
     75 }
     76 
     77 type semaProfileFlags int
     78 
     79 const (
     80 	semaBlockProfile semaProfileFlags = 1 << iota
     81 	semaMutexProfile
     82 )
     83 
     84 // Called from runtime.
     85 func semacquire(addr *uint32, profile semaProfileFlags) {
     86 	gp := getg()
     87 	if gp != gp.m.curg {
     88 		throw("semacquire not on the G stack")
     89 	}
     90 
     91 	// Easy case.
     92 	if cansemacquire(addr) {
     93 		return
     94 	}
     95 
     96 	// Harder case:
     97 	//	increment waiter count
     98 	//	try cansemacquire one more time, return if succeeded
     99 	//	enqueue itself as a waiter
    100 	//	sleep
    101 	//	(waiter descriptor is dequeued by signaler)
    102 	s := acquireSudog()
    103 	root := semroot(addr)
    104 	t0 := int64(0)
    105 	s.releasetime = 0
    106 	s.acquiretime = 0
    107 	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
    108 		t0 = cputicks()
    109 		s.releasetime = -1
    110 	}
    111 	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
    112 		if t0 == 0 {
    113 			t0 = cputicks()
    114 		}
    115 		s.acquiretime = t0
    116 	}
    117 	for {
    118 		lock(&root.lock)
    119 		// Add ourselves to nwait to disable "easy case" in semrelease.
    120 		atomic.Xadd(&root.nwait, 1)
    121 		// Check cansemacquire to avoid missed wakeup.
    122 		if cansemacquire(addr) {
    123 			atomic.Xadd(&root.nwait, -1)
    124 			unlock(&root.lock)
    125 			break
    126 		}
    127 		// Any semrelease after the cansemacquire knows we're waiting
    128 		// (we set nwait above), so go to sleep.
    129 		root.queue(addr, s)
    130 		goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
    131 		if cansemacquire(addr) {
    132 			break
    133 		}
    134 	}
    135 	if s.releasetime > 0 {
    136 		blockevent(s.releasetime-t0, 3)
    137 	}
    138 	releaseSudog(s)
    139 }
    140 
    141 func semrelease(addr *uint32) {
    142 	root := semroot(addr)
    143 	atomic.Xadd(addr, 1)
    144 
    145 	// Easy case: no waiters?
    146 	// This check must happen after the xadd, to avoid a missed wakeup
    147 	// (see loop in semacquire).
    148 	if atomic.Load(&root.nwait) == 0 {
    149 		return
    150 	}
    151 
    152 	// Harder case: search for a waiter and wake it.
    153 	lock(&root.lock)
    154 	if atomic.Load(&root.nwait) == 0 {
    155 		// The count is already consumed by another goroutine,
    156 		// so no need to wake up another goroutine.
    157 		unlock(&root.lock)
    158 		return
    159 	}
    160 	s := root.head
    161 	for ; s != nil; s = s.next {
    162 		if s.elem == unsafe.Pointer(addr) {
    163 			atomic.Xadd(&root.nwait, -1)
    164 			root.dequeue(s)
    165 			break
    166 		}
    167 	}
    168 	if s != nil {
    169 		if s.acquiretime != 0 {
    170 			t0 := cputicks()
    171 			for x := root.head; x != nil; x = x.next {
    172 				if x.elem == unsafe.Pointer(addr) {
    173 					x.acquiretime = t0
    174 				}
    175 			}
    176 			mutexevent(t0-s.acquiretime, 3)
    177 		}
    178 	}
    179 	unlock(&root.lock)
    180 	if s != nil { // May be slow, so unlock first
    181 		readyWithTime(s, 5)
    182 	}
    183 }
    184 
    185 func semroot(addr *uint32) *semaRoot {
    186 	return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
    187 }
    188 
    189 func cansemacquire(addr *uint32) bool {
    190 	for {
    191 		v := atomic.Load(addr)
    192 		if v == 0 {
    193 			return false
    194 		}
    195 		if atomic.Cas(addr, v, v-1) {
    196 			return true
    197 		}
    198 	}
    199 }
    200 
    201 func (root *semaRoot) queue(addr *uint32, s *sudog) {
    202 	s.g = getg()
    203 	s.elem = unsafe.Pointer(addr)
    204 	s.next = nil
    205 	s.prev = root.tail
    206 	if root.tail != nil {
    207 		root.tail.next = s
    208 	} else {
    209 		root.head = s
    210 	}
    211 	root.tail = s
    212 }
    213 
    214 func (root *semaRoot) dequeue(s *sudog) {
    215 	if s.next != nil {
    216 		s.next.prev = s.prev
    217 	} else {
    218 		root.tail = s.prev
    219 	}
    220 	if s.prev != nil {
    221 		s.prev.next = s.next
    222 	} else {
    223 		root.head = s.next
    224 	}
    225 	s.elem = nil
    226 	s.next = nil
    227 	s.prev = nil
    228 }
    229 
    230 // notifyList is a ticket-based notification list used to implement sync.Cond.
    231 //
    232 // It must be kept in sync with the sync package.
    233 type notifyList struct {
    234 	// wait is the ticket number of the next waiter. It is atomically
    235 	// incremented outside the lock.
    236 	wait uint32
    237 
    238 	// notify is the ticket number of the next waiter to be notified. It can
    239 	// be read outside the lock, but is only written to with lock held.
    240 	//
    241 	// Both wait & notify can wrap around, and such cases will be correctly
    242 	// handled as long as their "unwrapped" difference is bounded by 2^31.
    243 	// For this not to be the case, we'd need to have 2^31+ goroutines
    244 	// blocked on the same condvar, which is currently not possible.
    245 	notify uint32
    246 
    247 	// List of parked waiters.
    248 	lock mutex
    249 	head *sudog
    250 	tail *sudog
    251 }
    252 
    253 // less checks if a < b, considering a & b running counts that may overflow the
    254 // 32-bit range, and that their "unwrapped" difference is always less than 2^31.
    255 func less(a, b uint32) bool {
    256 	return int32(a-b) < 0
    257 }
    258 
    259 // notifyListAdd adds the caller to a notify list such that it can receive
    260 // notifications. The caller must eventually call notifyListWait to wait for
    261 // such a notification, passing the returned ticket number.
    262 //go:linkname notifyListAdd sync.runtime_notifyListAdd
    263 func notifyListAdd(l *notifyList) uint32 {
    264 	// This may be called concurrently, for example, when called from
    265 	// sync.Cond.Wait while holding a RWMutex in read mode.
    266 	return atomic.Xadd(&l.wait, 1) - 1
    267 }
    268 
    269 // notifyListWait waits for a notification. If one has been sent since
    270 // notifyListAdd was called, it returns immediately. Otherwise, it blocks.
    271 //go:linkname notifyListWait sync.runtime_notifyListWait
    272 func notifyListWait(l *notifyList, t uint32) {
    273 	lock(&l.lock)
    274 
    275 	// Return right away if this ticket has already been notified.
    276 	if less(t, l.notify) {
    277 		unlock(&l.lock)
    278 		return
    279 	}
    280 
    281 	// Enqueue itself.
    282 	s := acquireSudog()
    283 	s.g = getg()
    284 	s.ticket = t
    285 	s.releasetime = 0
    286 	t0 := int64(0)
    287 	if blockprofilerate > 0 {
    288 		t0 = cputicks()
    289 		s.releasetime = -1
    290 	}
    291 	if l.tail == nil {
    292 		l.head = s
    293 	} else {
    294 		l.tail.next = s
    295 	}
    296 	l.tail = s
    297 	goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
    298 	if t0 != 0 {
    299 		blockevent(s.releasetime-t0, 2)
    300 	}
    301 	releaseSudog(s)
    302 }
    303 
    304 // notifyListNotifyAll notifies all entries in the list.
    305 //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
    306 func notifyListNotifyAll(l *notifyList) {
    307 	// Fast-path: if there are no new waiters since the last notification
    308 	// we don't need to acquire the lock.
    309 	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
    310 		return
    311 	}
    312 
    313 	// Pull the list out into a local variable, waiters will be readied
    314 	// outside the lock.
    315 	lock(&l.lock)
    316 	s := l.head
    317 	l.head = nil
    318 	l.tail = nil
    319 
    320 	// Update the next ticket to be notified. We can set it to the current
    321 	// value of wait because any previous waiters are already in the list
    322 	// or will notice that they have already been notified when trying to
    323 	// add themselves to the list.
    324 	atomic.Store(&l.notify, atomic.Load(&l.wait))
    325 	unlock(&l.lock)
    326 
    327 	// Go through the local list and ready all waiters.
    328 	for s != nil {
    329 		next := s.next
    330 		s.next = nil
    331 		readyWithTime(s, 4)
    332 		s = next
    333 	}
    334 }
    335 
    336 // notifyListNotifyOne notifies one entry in the list.
    337 //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
    338 func notifyListNotifyOne(l *notifyList) {
    339 	// Fast-path: if there are no new waiters since the last notification
    340 	// we don't need to acquire the lock at all.
    341 	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
    342 		return
    343 	}
    344 
    345 	lock(&l.lock)
    346 
    347 	// Re-check under the lock if we need to do anything.
    348 	t := l.notify
    349 	if t == atomic.Load(&l.wait) {
    350 		unlock(&l.lock)
    351 		return
    352 	}
    353 
    354 	// Update the next notify ticket number, and try to find the G that
    355 	// needs to be notified. If it hasn't made it to the list yet we won't
    356 	// find it, but it won't park itself once it sees the new notify number.
    357 	atomic.Store(&l.notify, t+1)
    358 	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
    359 		if s.ticket == t {
    360 			n := s.next
    361 			if p != nil {
    362 				p.next = n
    363 			} else {
    364 				l.head = n
    365 			}
    366 			if n == nil {
    367 				l.tail = p
    368 			}
    369 			unlock(&l.lock)
    370 			s.next = nil
    371 			readyWithTime(s, 4)
    372 			return
    373 		}
    374 	}
    375 	unlock(&l.lock)
    376 }
    377 
    378 //go:linkname notifyListCheck sync.runtime_notifyListCheck
    379 func notifyListCheck(sz uintptr) {
    380 	if sz != unsafe.Sizeof(notifyList{}) {
    381 		print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
    382 		throw("bad notifyList size")
    383 	}
    384 }
    385