Home | History | Annotate | Download | only in runtime
      1 // Copyright 2014 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 package runtime
      6 
      7 // This file contains the implementation of Go channels.
      8 
      9 import "unsafe"
     10 
     11 const (
     12 	maxAlign  = 8
     13 	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
     14 	debugChan = false
     15 )
     16 
     17 type hchan struct {
     18 	qcount   uint           // total data in the queue
     19 	dataqsiz uint           // size of the circular queue
     20 	buf      unsafe.Pointer // points to an array of dataqsiz elements
     21 	elemsize uint16
     22 	closed   uint32
     23 	elemtype *_type // element type
     24 	sendx    uint   // send index
     25 	recvx    uint   // receive index
     26 	recvq    waitq  // list of recv waiters
     27 	sendq    waitq  // list of send waiters
     28 	lock     mutex
     29 }
     30 
     31 type waitq struct {
     32 	first *sudog
     33 	last  *sudog
     34 }
     35 
     36 //go:linkname reflect_makechan reflect.makechan
     37 func reflect_makechan(t *chantype, size int64) *hchan {
     38 	return makechan(t, size)
     39 }
     40 
     41 func makechan(t *chantype, size int64) *hchan {
     42 	elem := t.elem
     43 
     44 	// compiler checks this but be safe.
     45 	if elem.size >= 1<<16 {
     46 		throw("makechan: invalid channel element type")
     47 	}
     48 	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
     49 		throw("makechan: bad alignment")
     50 	}
     51 	if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/uintptr(elem.size)) {
     52 		panic("makechan: size out of range")
     53 	}
     54 
     55 	var c *hchan
     56 	if elem.kind&kindNoPointers != 0 || size == 0 {
     57 		// Allocate memory in one call.
     58 		// Hchan does not contain pointers interesting for GC in this case:
     59 		// buf points into the same allocation, elemtype is persistent.
     60 		// SudoG's are referenced from their owning thread so they can't be collected.
     61 		// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
     62 		c = (*hchan)(mallocgc(hchanSize+uintptr(size)*uintptr(elem.size), nil, flagNoScan))
     63 		if size > 0 && elem.size != 0 {
     64 			c.buf = add(unsafe.Pointer(c), hchanSize)
     65 		} else {
     66 			// race detector uses this location for synchronization
     67 			// Also prevents us from pointing beyond the allocation (see issue 9401).
     68 			c.buf = unsafe.Pointer(c)
     69 		}
     70 	} else {
     71 		c = new(hchan)
     72 		c.buf = newarray(elem, uintptr(size))
     73 	}
     74 	c.elemsize = uint16(elem.size)
     75 	c.elemtype = elem
     76 	c.dataqsiz = uint(size)
     77 
     78 	if debugChan {
     79 		print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
     80 	}
     81 	return c
     82 }
     83 
     84 // chanbuf(c, i) is pointer to the i'th slot in the buffer.
     85 func chanbuf(c *hchan, i uint) unsafe.Pointer {
     86 	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
     87 }
     88 
     89 // entry point for c <- x from compiled code
     90 //go:nosplit
     91 func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) {
     92 	chansend(t, c, elem, true, getcallerpc(unsafe.Pointer(&t)))
     93 }
     94 
     95 /*
     96  * generic single channel send/recv
     97  * If block is not nil,
     98  * then the protocol will not
     99  * sleep but return if it could
    100  * not complete.
    101  *
    102  * sleep can wake up with g.param == nil
    103  * when a channel involved in the sleep has
    104  * been closed.  it is easiest to loop and re-run
    105  * the operation; we'll see that it's now closed.
    106  */
    107 func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    108 	if raceenabled {
    109 		raceReadObjectPC(t.elem, ep, callerpc, funcPC(chansend))
    110 	}
    111 
    112 	if c == nil {
    113 		if !block {
    114 			return false
    115 		}
    116 		gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
    117 		throw("unreachable")
    118 	}
    119 
    120 	if debugChan {
    121 		print("chansend: chan=", c, "\n")
    122 	}
    123 
    124 	if raceenabled {
    125 		racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
    126 	}
    127 
    128 	// Fast path: check for failed non-blocking operation without acquiring the lock.
    129 	//
    130 	// After observing that the channel is not closed, we observe that the channel is
    131 	// not ready for sending. Each of these observations is a single word-sized read
    132 	// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
    133 	// Because a closed channel cannot transition from 'ready for sending' to
    134 	// 'not ready for sending', even if the channel is closed between the two observations,
    135 	// they imply a moment between the two when the channel was both not yet closed
    136 	// and not ready for sending. We behave as if we observed the channel at that moment,
    137 	// and report that the send cannot proceed.
    138 	//
    139 	// It is okay if the reads are reordered here: if we observe that the channel is not
    140 	// ready for sending and then observe that it is not closed, that implies that the
    141 	// channel wasn't closed during the first observation.
    142 	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
    143 		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
    144 		return false
    145 	}
    146 
    147 	var t0 int64
    148 	if blockprofilerate > 0 {
    149 		t0 = cputicks()
    150 	}
    151 
    152 	lock(&c.lock)
    153 	if c.closed != 0 {
    154 		unlock(&c.lock)
    155 		panic("send on closed channel")
    156 	}
    157 
    158 	if c.dataqsiz == 0 { // synchronous channel
    159 		sg := c.recvq.dequeue()
    160 		if sg != nil { // found a waiting receiver
    161 			if raceenabled {
    162 				racesync(c, sg)
    163 			}
    164 			unlock(&c.lock)
    165 
    166 			recvg := sg.g
    167 			if sg.elem != nil {
    168 				syncsend(c, sg, ep)
    169 			}
    170 			recvg.param = unsafe.Pointer(sg)
    171 			if sg.releasetime != 0 {
    172 				sg.releasetime = cputicks()
    173 			}
    174 			goready(recvg, 3)
    175 			return true
    176 		}
    177 
    178 		if !block {
    179 			unlock(&c.lock)
    180 			return false
    181 		}
    182 
    183 		// no receiver available: block on this channel.
    184 		gp := getg()
    185 		mysg := acquireSudog()
    186 		mysg.releasetime = 0
    187 		if t0 != 0 {
    188 			mysg.releasetime = -1
    189 		}
    190 		mysg.elem = ep
    191 		mysg.waitlink = nil
    192 		gp.waiting = mysg
    193 		mysg.g = gp
    194 		mysg.selectdone = nil
    195 		gp.param = nil
    196 		c.sendq.enqueue(mysg)
    197 		goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
    198 
    199 		// someone woke us up.
    200 		if mysg != gp.waiting {
    201 			throw("G waiting list is corrupted!")
    202 		}
    203 		gp.waiting = nil
    204 		if gp.param == nil {
    205 			if c.closed == 0 {
    206 				throw("chansend: spurious wakeup")
    207 			}
    208 			panic("send on closed channel")
    209 		}
    210 		gp.param = nil
    211 		if mysg.releasetime > 0 {
    212 			blockevent(int64(mysg.releasetime)-t0, 2)
    213 		}
    214 		releaseSudog(mysg)
    215 		return true
    216 	}
    217 
    218 	// asynchronous channel
    219 	// wait for some space to write our data
    220 	var t1 int64
    221 	for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup {
    222 		if !block {
    223 			unlock(&c.lock)
    224 			return false
    225 		}
    226 		gp := getg()
    227 		mysg := acquireSudog()
    228 		mysg.releasetime = 0
    229 		if t0 != 0 {
    230 			mysg.releasetime = -1
    231 		}
    232 		mysg.g = gp
    233 		mysg.elem = nil
    234 		mysg.selectdone = nil
    235 		c.sendq.enqueue(mysg)
    236 		goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3)
    237 
    238 		// someone woke us up - try again
    239 		if mysg.releasetime > 0 {
    240 			t1 = mysg.releasetime
    241 		}
    242 		releaseSudog(mysg)
    243 		lock(&c.lock)
    244 		if c.closed != 0 {
    245 			unlock(&c.lock)
    246 			panic("send on closed channel")
    247 		}
    248 	}
    249 
    250 	// write our data into the channel buffer
    251 	if raceenabled {
    252 		raceacquire(chanbuf(c, c.sendx))
    253 		racerelease(chanbuf(c, c.sendx))
    254 	}
    255 	typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep)
    256 	c.sendx++
    257 	if c.sendx == c.dataqsiz {
    258 		c.sendx = 0
    259 	}
    260 	c.qcount++
    261 
    262 	// wake up a waiting receiver
    263 	sg := c.recvq.dequeue()
    264 	if sg != nil {
    265 		recvg := sg.g
    266 		unlock(&c.lock)
    267 		if sg.releasetime != 0 {
    268 			sg.releasetime = cputicks()
    269 		}
    270 		goready(recvg, 3)
    271 	} else {
    272 		unlock(&c.lock)
    273 	}
    274 	if t1 > 0 {
    275 		blockevent(t1-t0, 2)
    276 	}
    277 	return true
    278 }
    279 
    280 func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) {
    281 	// Send on unbuffered channel is the only operation
    282 	// in the entire runtime where one goroutine
    283 	// writes to the stack of another goroutine. The GC assumes that
    284 	// stack writes only happen when the goroutine is running and are
    285 	// only done by that goroutine. Using a write barrier is sufficient to
    286 	// make up for violating that assumption, but the write barrier has to work.
    287 	// typedmemmove will call heapBitsBulkBarrier, but the target bytes
    288 	// are not in the heap, so that will not help. We arrange to call
    289 	// memmove and typeBitsBulkBarrier instead.
    290 	memmove(sg.elem, elem, c.elemtype.size)
    291 	typeBitsBulkBarrier(c.elemtype, uintptr(sg.elem), c.elemtype.size)
    292 	sg.elem = nil
    293 }
    294 
    295 func closechan(c *hchan) {
    296 	if c == nil {
    297 		panic("close of nil channel")
    298 	}
    299 
    300 	lock(&c.lock)
    301 	if c.closed != 0 {
    302 		unlock(&c.lock)
    303 		panic("close of closed channel")
    304 	}
    305 
    306 	if raceenabled {
    307 		callerpc := getcallerpc(unsafe.Pointer(&c))
    308 		racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
    309 		racerelease(unsafe.Pointer(c))
    310 	}
    311 
    312 	c.closed = 1
    313 
    314 	// release all readers
    315 	for {
    316 		sg := c.recvq.dequeue()
    317 		if sg == nil {
    318 			break
    319 		}
    320 		gp := sg.g
    321 		sg.elem = nil
    322 		gp.param = nil
    323 		if sg.releasetime != 0 {
    324 			sg.releasetime = cputicks()
    325 		}
    326 		goready(gp, 3)
    327 	}
    328 
    329 	// release all writers
    330 	for {
    331 		sg := c.sendq.dequeue()
    332 		if sg == nil {
    333 			break
    334 		}
    335 		gp := sg.g
    336 		sg.elem = nil
    337 		gp.param = nil
    338 		if sg.releasetime != 0 {
    339 			sg.releasetime = cputicks()
    340 		}
    341 		goready(gp, 3)
    342 	}
    343 	unlock(&c.lock)
    344 }
    345 
    346 // entry points for <- c from compiled code
    347 //go:nosplit
    348 func chanrecv1(t *chantype, c *hchan, elem unsafe.Pointer) {
    349 	chanrecv(t, c, elem, true)
    350 }
    351 
    352 //go:nosplit
    353 func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) {
    354 	_, received = chanrecv(t, c, elem, true)
    355 	return
    356 }
    357 
    358 // chanrecv receives on channel c and writes the received data to ep.
    359 // ep may be nil, in which case received data is ignored.
    360 // If block == false and no elements are available, returns (false, false).
    361 // Otherwise, if c is closed, zeros *ep and returns (true, false).
    362 // Otherwise, fills in *ep with an element and returns (true, true).
    363 func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    364 	// raceenabled: don't need to check ep, as it is always on the stack.
    365 
    366 	if debugChan {
    367 		print("chanrecv: chan=", c, "\n")
    368 	}
    369 
    370 	if c == nil {
    371 		if !block {
    372 			return
    373 		}
    374 		gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
    375 		throw("unreachable")
    376 	}
    377 
    378 	// Fast path: check for failed non-blocking operation without acquiring the lock.
    379 	//
    380 	// After observing that the channel is not ready for receiving, we observe that the
    381 	// channel is not closed. Each of these observations is a single word-sized read
    382 	// (first c.sendq.first or c.qcount, and second c.closed).
    383 	// Because a channel cannot be reopened, the later observation of the channel
    384 	// being not closed implies that it was also not closed at the moment of the
    385 	// first observation. We behave as if we observed the channel at that moment
    386 	// and report that the receive cannot proceed.
    387 	//
    388 	// The order of operations is important here: reversing the operations can lead to
    389 	// incorrect behavior when racing with a close.
    390 	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
    391 		c.dataqsiz > 0 && atomicloaduint(&c.qcount) == 0) &&
    392 		atomicload(&c.closed) == 0 {
    393 		return
    394 	}
    395 
    396 	var t0 int64
    397 	if blockprofilerate > 0 {
    398 		t0 = cputicks()
    399 	}
    400 
    401 	lock(&c.lock)
    402 	if c.dataqsiz == 0 { // synchronous channel
    403 		if c.closed != 0 {
    404 			return recvclosed(c, ep)
    405 		}
    406 
    407 		sg := c.sendq.dequeue()
    408 		if sg != nil {
    409 			if raceenabled {
    410 				racesync(c, sg)
    411 			}
    412 			unlock(&c.lock)
    413 
    414 			if ep != nil {
    415 				typedmemmove(c.elemtype, ep, sg.elem)
    416 			}
    417 			sg.elem = nil
    418 			gp := sg.g
    419 			gp.param = unsafe.Pointer(sg)
    420 			if sg.releasetime != 0 {
    421 				sg.releasetime = cputicks()
    422 			}
    423 			goready(gp, 3)
    424 			selected = true
    425 			received = true
    426 			return
    427 		}
    428 
    429 		if !block {
    430 			unlock(&c.lock)
    431 			return
    432 		}
    433 
    434 		// no sender available: block on this channel.
    435 		gp := getg()
    436 		mysg := acquireSudog()
    437 		mysg.releasetime = 0
    438 		if t0 != 0 {
    439 			mysg.releasetime = -1
    440 		}
    441 		mysg.elem = ep
    442 		mysg.waitlink = nil
    443 		gp.waiting = mysg
    444 		mysg.g = gp
    445 		mysg.selectdone = nil
    446 		gp.param = nil
    447 		c.recvq.enqueue(mysg)
    448 		goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
    449 
    450 		// someone woke us up
    451 		if mysg != gp.waiting {
    452 			throw("G waiting list is corrupted!")
    453 		}
    454 		gp.waiting = nil
    455 		if mysg.releasetime > 0 {
    456 			blockevent(mysg.releasetime-t0, 2)
    457 		}
    458 		haveData := gp.param != nil
    459 		gp.param = nil
    460 		releaseSudog(mysg)
    461 
    462 		if haveData {
    463 			// a sender sent us some data. It already wrote to ep.
    464 			selected = true
    465 			received = true
    466 			return
    467 		}
    468 
    469 		lock(&c.lock)
    470 		if c.closed == 0 {
    471 			throw("chanrecv: spurious wakeup")
    472 		}
    473 		return recvclosed(c, ep)
    474 	}
    475 
    476 	// asynchronous channel
    477 	// wait for some data to appear
    478 	var t1 int64
    479 	for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup {
    480 		if c.closed != 0 {
    481 			selected, received = recvclosed(c, ep)
    482 			if t1 > 0 {
    483 				blockevent(t1-t0, 2)
    484 			}
    485 			return
    486 		}
    487 
    488 		if !block {
    489 			unlock(&c.lock)
    490 			return
    491 		}
    492 
    493 		// wait for someone to send an element
    494 		gp := getg()
    495 		mysg := acquireSudog()
    496 		mysg.releasetime = 0
    497 		if t0 != 0 {
    498 			mysg.releasetime = -1
    499 		}
    500 		mysg.elem = nil
    501 		mysg.g = gp
    502 		mysg.selectdone = nil
    503 
    504 		c.recvq.enqueue(mysg)
    505 		goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3)
    506 
    507 		// someone woke us up - try again
    508 		if mysg.releasetime > 0 {
    509 			t1 = mysg.releasetime
    510 		}
    511 		releaseSudog(mysg)
    512 		lock(&c.lock)
    513 	}
    514 
    515 	if raceenabled {
    516 		raceacquire(chanbuf(c, c.recvx))
    517 		racerelease(chanbuf(c, c.recvx))
    518 	}
    519 	if ep != nil {
    520 		typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx))
    521 	}
    522 	memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
    523 
    524 	c.recvx++
    525 	if c.recvx == c.dataqsiz {
    526 		c.recvx = 0
    527 	}
    528 	c.qcount--
    529 
    530 	// ping a sender now that there is space
    531 	sg := c.sendq.dequeue()
    532 	if sg != nil {
    533 		gp := sg.g
    534 		unlock(&c.lock)
    535 		if sg.releasetime != 0 {
    536 			sg.releasetime = cputicks()
    537 		}
    538 		goready(gp, 3)
    539 	} else {
    540 		unlock(&c.lock)
    541 	}
    542 
    543 	if t1 > 0 {
    544 		blockevent(t1-t0, 2)
    545 	}
    546 	selected = true
    547 	received = true
    548 	return
    549 }
    550 
    551 // recvclosed is a helper function for chanrecv.  Handles cleanup
    552 // when the receiver encounters a closed channel.
    553 // Caller must hold c.lock, recvclosed will release the lock.
    554 func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) {
    555 	if raceenabled {
    556 		raceacquire(unsafe.Pointer(c))
    557 	}
    558 	unlock(&c.lock)
    559 	if ep != nil {
    560 		memclr(ep, uintptr(c.elemsize))
    561 	}
    562 	return true, false
    563 }
    564 
    565 // compiler implements
    566 //
    567 //	select {
    568 //	case c <- v:
    569 //		... foo
    570 //	default:
    571 //		... bar
    572 //	}
    573 //
    574 // as
    575 //
    576 //	if selectnbsend(c, v) {
    577 //		... foo
    578 //	} else {
    579 //		... bar
    580 //	}
    581 //
    582 func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {
    583 	return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
    584 }
    585 
    586 // compiler implements
    587 //
    588 //	select {
    589 //	case v = <-c:
    590 //		... foo
    591 //	default:
    592 //		... bar
    593 //	}
    594 //
    595 // as
    596 //
    597 //	if selectnbrecv(&v, c) {
    598 //		... foo
    599 //	} else {
    600 //		... bar
    601 //	}
    602 //
    603 func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {
    604 	selected, _ = chanrecv(t, c, elem, false)
    605 	return
    606 }
    607 
    608 // compiler implements
    609 //
    610 //	select {
    611 //	case v, ok = <-c:
    612 //		... foo
    613 //	default:
    614 //		... bar
    615 //	}
    616 //
    617 // as
    618 //
    619 //	if c != nil && selectnbrecv2(&v, &ok, c) {
    620 //		... foo
    621 //	} else {
    622 //		... bar
    623 //	}
    624 //
    625 func selectnbrecv2(t *chantype, elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
    626 	// TODO(khr): just return 2 values from this function, now that it is in Go.
    627 	selected, *received = chanrecv(t, c, elem, false)
    628 	return
    629 }
    630 
    631 //go:linkname reflect_chansend reflect.chansend
    632 func reflect_chansend(t *chantype, c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
    633 	return chansend(t, c, elem, !nb, getcallerpc(unsafe.Pointer(&t)))
    634 }
    635 
    636 //go:linkname reflect_chanrecv reflect.chanrecv
    637 func reflect_chanrecv(t *chantype, c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
    638 	return chanrecv(t, c, elem, !nb)
    639 }
    640 
    641 //go:linkname reflect_chanlen reflect.chanlen
    642 func reflect_chanlen(c *hchan) int {
    643 	if c == nil {
    644 		return 0
    645 	}
    646 	return int(c.qcount)
    647 }
    648 
    649 //go:linkname reflect_chancap reflect.chancap
    650 func reflect_chancap(c *hchan) int {
    651 	if c == nil {
    652 		return 0
    653 	}
    654 	return int(c.dataqsiz)
    655 }
    656 
    657 //go:linkname reflect_chanclose reflect.chanclose
    658 func reflect_chanclose(c *hchan) {
    659 	closechan(c)
    660 }
    661 
    662 func (q *waitq) enqueue(sgp *sudog) {
    663 	sgp.next = nil
    664 	x := q.last
    665 	if x == nil {
    666 		sgp.prev = nil
    667 		q.first = sgp
    668 		q.last = sgp
    669 		return
    670 	}
    671 	sgp.prev = x
    672 	x.next = sgp
    673 	q.last = sgp
    674 }
    675 
    676 func (q *waitq) dequeue() *sudog {
    677 	for {
    678 		sgp := q.first
    679 		if sgp == nil {
    680 			return nil
    681 		}
    682 		y := sgp.next
    683 		if y == nil {
    684 			q.first = nil
    685 			q.last = nil
    686 		} else {
    687 			y.prev = nil
    688 			q.first = y
    689 			sgp.next = nil // mark as removed (see dequeueSudog)
    690 		}
    691 
    692 		// if sgp participates in a select and is already signaled, ignore it
    693 		if sgp.selectdone != nil {
    694 			// claim the right to signal
    695 			if *sgp.selectdone != 0 || !cas(sgp.selectdone, 0, 1) {
    696 				continue
    697 			}
    698 		}
    699 
    700 		return sgp
    701 	}
    702 }
    703 
    704 func racesync(c *hchan, sg *sudog) {
    705 	racerelease(chanbuf(c, 0))
    706 	raceacquireg(sg.g, chanbuf(c, 0))
    707 	racereleaseg(sg.g, chanbuf(c, 0))
    708 	raceacquire(chanbuf(c, 0))
    709 }
    710