Home | History | Annotate | Download | only in runtime
      1 // Copyright 2009 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 package runtime
      6 
      7 // This file contains the implementation of Go select statements.
      8 
      9 import (
     10 	"runtime/internal/sys"
     11 	"unsafe"
     12 )
     13 
     14 const debugSelect = false
     15 
     16 const (
     17 	// scase.kind
     18 	caseNil = iota
     19 	caseRecv
     20 	caseSend
     21 	caseDefault
     22 )
     23 
     24 // Select statement header.
     25 // Known to compiler.
     26 // Changes here must also be made in src/cmd/internal/gc/select.go's selecttype.
     27 type hselect struct {
     28 	tcase     uint16   // total count of scase[]
     29 	ncase     uint16   // currently filled scase[]
     30 	pollorder *uint16  // case poll order
     31 	lockorder *uint16  // channel lock order
     32 	scase     [1]scase // one per case (in order of appearance)
     33 }
     34 
     35 // Select case descriptor.
     36 // Known to compiler.
     37 // Changes here must also be made in src/cmd/internal/gc/select.go's selecttype.
     38 type scase struct {
     39 	elem        unsafe.Pointer // data element
     40 	c           *hchan         // chan
     41 	pc          uintptr        // return pc (for race detector / msan)
     42 	kind        uint16
     43 	receivedp   *bool // pointer to received bool, if any
     44 	releasetime int64
     45 }
     46 
     47 var (
     48 	chansendpc = funcPC(chansend)
     49 	chanrecvpc = funcPC(chanrecv)
     50 )
     51 
     52 func selectsize(size uintptr) uintptr {
     53 	selsize := unsafe.Sizeof(hselect{}) +
     54 		(size-1)*unsafe.Sizeof(hselect{}.scase[0]) +
     55 		size*unsafe.Sizeof(*hselect{}.lockorder) +
     56 		size*unsafe.Sizeof(*hselect{}.pollorder)
     57 	return round(selsize, sys.Int64Align)
     58 }
     59 
     60 func newselect(sel *hselect, selsize int64, size int32) {
     61 	if selsize != int64(selectsize(uintptr(size))) {
     62 		print("runtime: bad select size ", selsize, ", want ", selectsize(uintptr(size)), "\n")
     63 		throw("bad select size")
     64 	}
     65 	sel.tcase = uint16(size)
     66 	sel.ncase = 0
     67 	sel.lockorder = (*uint16)(add(unsafe.Pointer(&sel.scase), uintptr(size)*unsafe.Sizeof(hselect{}.scase[0])))
     68 	sel.pollorder = (*uint16)(add(unsafe.Pointer(sel.lockorder), uintptr(size)*unsafe.Sizeof(*hselect{}.lockorder)))
     69 
     70 	if debugSelect {
     71 		print("newselect s=", sel, " size=", size, "\n")
     72 	}
     73 }
     74 
     75 func selectsend(sel *hselect, c *hchan, elem unsafe.Pointer) {
     76 	pc := getcallerpc()
     77 	i := sel.ncase
     78 	if i >= sel.tcase {
     79 		throw("selectsend: too many cases")
     80 	}
     81 	sel.ncase = i + 1
     82 	if c == nil {
     83 		return
     84 	}
     85 	cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
     86 	cas.pc = pc
     87 	cas.c = c
     88 	cas.kind = caseSend
     89 	cas.elem = elem
     90 
     91 	if debugSelect {
     92 		print("selectsend s=", sel, " pc=", hex(cas.pc), " chan=", cas.c, "\n")
     93 	}
     94 }
     95 
     96 func selectrecv(sel *hselect, c *hchan, elem unsafe.Pointer, received *bool) {
     97 	pc := getcallerpc()
     98 	i := sel.ncase
     99 	if i >= sel.tcase {
    100 		throw("selectrecv: too many cases")
    101 	}
    102 	sel.ncase = i + 1
    103 	if c == nil {
    104 		return
    105 	}
    106 	cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
    107 	cas.pc = pc
    108 	cas.c = c
    109 	cas.kind = caseRecv
    110 	cas.elem = elem
    111 	cas.receivedp = received
    112 
    113 	if debugSelect {
    114 		print("selectrecv s=", sel, " pc=", hex(cas.pc), " chan=", cas.c, "\n")
    115 	}
    116 }
    117 
    118 func selectdefault(sel *hselect) {
    119 	pc := getcallerpc()
    120 	i := sel.ncase
    121 	if i >= sel.tcase {
    122 		throw("selectdefault: too many cases")
    123 	}
    124 	sel.ncase = i + 1
    125 	cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
    126 	cas.pc = pc
    127 	cas.c = nil
    128 	cas.kind = caseDefault
    129 
    130 	if debugSelect {
    131 		print("selectdefault s=", sel, " pc=", hex(cas.pc), "\n")
    132 	}
    133 }
    134 
    135 func sellock(scases []scase, lockorder []uint16) {
    136 	var c *hchan
    137 	for _, o := range lockorder {
    138 		c0 := scases[o].c
    139 		if c0 != nil && c0 != c {
    140 			c = c0
    141 			lock(&c.lock)
    142 		}
    143 	}
    144 }
    145 
    146 func selunlock(scases []scase, lockorder []uint16) {
    147 	// We must be very careful here to not touch sel after we have unlocked
    148 	// the last lock, because sel can be freed right after the last unlock.
    149 	// Consider the following situation.
    150 	// First M calls runtimepark() in runtimeselectgo() passing the sel.
    151 	// Once runtimepark() has unlocked the last lock, another M makes
    152 	// the G that calls select runnable again and schedules it for execution.
    153 	// When the G runs on another M, it locks all the locks and frees sel.
    154 	// Now if the first M touches sel, it will access freed memory.
    155 	for i := len(scases) - 1; i >= 0; i-- {
    156 		c := scases[lockorder[i]].c
    157 		if c == nil {
    158 			break
    159 		}
    160 		if i > 0 && c == scases[lockorder[i-1]].c {
    161 			continue // will unlock it on the next iteration
    162 		}
    163 		unlock(&c.lock)
    164 	}
    165 }
    166 
    167 func selparkcommit(gp *g, _ unsafe.Pointer) bool {
    168 	// This must not access gp's stack (see gopark). In
    169 	// particular, it must not access the *hselect. That's okay,
    170 	// because by the time this is called, gp.waiting has all
    171 	// channels in lock order.
    172 	var lastc *hchan
    173 	for sg := gp.waiting; sg != nil; sg = sg.waitlink {
    174 		if sg.c != lastc && lastc != nil {
    175 			// As soon as we unlock the channel, fields in
    176 			// any sudog with that channel may change,
    177 			// including c and waitlink. Since multiple
    178 			// sudogs may have the same channel, we unlock
    179 			// only after we've passed the last instance
    180 			// of a channel.
    181 			unlock(&lastc.lock)
    182 		}
    183 		lastc = sg.c
    184 	}
    185 	if lastc != nil {
    186 		unlock(&lastc.lock)
    187 	}
    188 	return true
    189 }
    190 
    191 func block() {
    192 	gopark(nil, nil, "select (no cases)", traceEvGoStop, 1) // forever
    193 }
    194 
    195 // selectgo implements the select statement.
    196 //
    197 // *sel is on the current goroutine's stack (regardless of any
    198 // escaping in selectgo).
    199 //
    200 // selectgo returns the index of the chosen scase, which matches the
    201 // ordinal position of its respective select{recv,send,default} call.
    202 func selectgo(sel *hselect) int {
    203 	if debugSelect {
    204 		print("select: sel=", sel, "\n")
    205 	}
    206 	if sel.ncase != sel.tcase {
    207 		throw("selectgo: case count mismatch")
    208 	}
    209 
    210 	scaseslice := slice{unsafe.Pointer(&sel.scase), int(sel.ncase), int(sel.ncase)}
    211 	scases := *(*[]scase)(unsafe.Pointer(&scaseslice))
    212 
    213 	var t0 int64
    214 	if blockprofilerate > 0 {
    215 		t0 = cputicks()
    216 		for i := 0; i < int(sel.ncase); i++ {
    217 			scases[i].releasetime = -1
    218 		}
    219 	}
    220 
    221 	// The compiler rewrites selects that statically have
    222 	// only 0 or 1 cases plus default into simpler constructs.
    223 	// The only way we can end up with such small sel.ncase
    224 	// values here is for a larger select in which most channels
    225 	// have been nilled out. The general code handles those
    226 	// cases correctly, and they are rare enough not to bother
    227 	// optimizing (and needing to test).
    228 
    229 	// generate permuted order
    230 	pollslice := slice{unsafe.Pointer(sel.pollorder), int(sel.ncase), int(sel.ncase)}
    231 	pollorder := *(*[]uint16)(unsafe.Pointer(&pollslice))
    232 	for i := 1; i < int(sel.ncase); i++ {
    233 		j := fastrandn(uint32(i + 1))
    234 		pollorder[i] = pollorder[j]
    235 		pollorder[j] = uint16(i)
    236 	}
    237 
    238 	// sort the cases by Hchan address to get the locking order.
    239 	// simple heap sort, to guarantee n log n time and constant stack footprint.
    240 	lockslice := slice{unsafe.Pointer(sel.lockorder), int(sel.ncase), int(sel.ncase)}
    241 	lockorder := *(*[]uint16)(unsafe.Pointer(&lockslice))
    242 	for i := 0; i < int(sel.ncase); i++ {
    243 		j := i
    244 		// Start with the pollorder to permute cases on the same channel.
    245 		c := scases[pollorder[i]].c
    246 		for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
    247 			k := (j - 1) / 2
    248 			lockorder[j] = lockorder[k]
    249 			j = k
    250 		}
    251 		lockorder[j] = pollorder[i]
    252 	}
    253 	for i := int(sel.ncase) - 1; i >= 0; i-- {
    254 		o := lockorder[i]
    255 		c := scases[o].c
    256 		lockorder[i] = lockorder[0]
    257 		j := 0
    258 		for {
    259 			k := j*2 + 1
    260 			if k >= i {
    261 				break
    262 			}
    263 			if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
    264 				k++
    265 			}
    266 			if c.sortkey() < scases[lockorder[k]].c.sortkey() {
    267 				lockorder[j] = lockorder[k]
    268 				j = k
    269 				continue
    270 			}
    271 			break
    272 		}
    273 		lockorder[j] = o
    274 	}
    275 	/*
    276 		for i := 0; i+1 < int(sel.ncase); i++ {
    277 			if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
    278 				print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
    279 				throw("select: broken sort")
    280 			}
    281 		}
    282 	*/
    283 
    284 	// lock all the channels involved in the select
    285 	sellock(scases, lockorder)
    286 
    287 	var (
    288 		gp     *g
    289 		sg     *sudog
    290 		c      *hchan
    291 		k      *scase
    292 		sglist *sudog
    293 		sgnext *sudog
    294 		qp     unsafe.Pointer
    295 		nextp  **sudog
    296 	)
    297 
    298 loop:
    299 	// pass 1 - look for something already waiting
    300 	var dfli int
    301 	var dfl *scase
    302 	var casi int
    303 	var cas *scase
    304 	for i := 0; i < int(sel.ncase); i++ {
    305 		casi = int(pollorder[i])
    306 		cas = &scases[casi]
    307 		c = cas.c
    308 
    309 		switch cas.kind {
    310 		case caseNil:
    311 			continue
    312 
    313 		case caseRecv:
    314 			sg = c.sendq.dequeue()
    315 			if sg != nil {
    316 				goto recv
    317 			}
    318 			if c.qcount > 0 {
    319 				goto bufrecv
    320 			}
    321 			if c.closed != 0 {
    322 				goto rclose
    323 			}
    324 
    325 		case caseSend:
    326 			if raceenabled {
    327 				racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)
    328 			}
    329 			if c.closed != 0 {
    330 				goto sclose
    331 			}
    332 			sg = c.recvq.dequeue()
    333 			if sg != nil {
    334 				goto send
    335 			}
    336 			if c.qcount < c.dataqsiz {
    337 				goto bufsend
    338 			}
    339 
    340 		case caseDefault:
    341 			dfli = casi
    342 			dfl = cas
    343 		}
    344 	}
    345 
    346 	if dfl != nil {
    347 		selunlock(scases, lockorder)
    348 		casi = dfli
    349 		cas = dfl
    350 		goto retc
    351 	}
    352 
    353 	// pass 2 - enqueue on all chans
    354 	gp = getg()
    355 	if gp.waiting != nil {
    356 		throw("gp.waiting != nil")
    357 	}
    358 	nextp = &gp.waiting
    359 	for _, casei := range lockorder {
    360 		casi = int(casei)
    361 		cas = &scases[casi]
    362 		if cas.kind == caseNil {
    363 			continue
    364 		}
    365 		c = cas.c
    366 		sg := acquireSudog()
    367 		sg.g = gp
    368 		sg.isSelect = true
    369 		// No stack splits between assigning elem and enqueuing
    370 		// sg on gp.waiting where copystack can find it.
    371 		sg.elem = cas.elem
    372 		sg.releasetime = 0
    373 		if t0 != 0 {
    374 			sg.releasetime = -1
    375 		}
    376 		sg.c = c
    377 		// Construct waiting list in lock order.
    378 		*nextp = sg
    379 		nextp = &sg.waitlink
    380 
    381 		switch cas.kind {
    382 		case caseRecv:
    383 			c.recvq.enqueue(sg)
    384 
    385 		case caseSend:
    386 			c.sendq.enqueue(sg)
    387 		}
    388 	}
    389 
    390 	// wait for someone to wake us up
    391 	gp.param = nil
    392 	gopark(selparkcommit, nil, "select", traceEvGoBlockSelect, 1)
    393 
    394 	sellock(scases, lockorder)
    395 
    396 	gp.selectDone = 0
    397 	sg = (*sudog)(gp.param)
    398 	gp.param = nil
    399 
    400 	// pass 3 - dequeue from unsuccessful chans
    401 	// otherwise they stack up on quiet channels
    402 	// record the successful case, if any.
    403 	// We singly-linked up the SudoGs in lock order.
    404 	casi = -1
    405 	cas = nil
    406 	sglist = gp.waiting
    407 	// Clear all elem before unlinking from gp.waiting.
    408 	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    409 		sg1.isSelect = false
    410 		sg1.elem = nil
    411 		sg1.c = nil
    412 	}
    413 	gp.waiting = nil
    414 
    415 	for _, casei := range lockorder {
    416 		k = &scases[casei]
    417 		if k.kind == caseNil {
    418 			continue
    419 		}
    420 		if sglist.releasetime > 0 {
    421 			k.releasetime = sglist.releasetime
    422 		}
    423 		if sg == sglist {
    424 			// sg has already been dequeued by the G that woke us up.
    425 			casi = int(casei)
    426 			cas = k
    427 		} else {
    428 			c = k.c
    429 			if k.kind == caseSend {
    430 				c.sendq.dequeueSudoG(sglist)
    431 			} else {
    432 				c.recvq.dequeueSudoG(sglist)
    433 			}
    434 		}
    435 		sgnext = sglist.waitlink
    436 		sglist.waitlink = nil
    437 		releaseSudog(sglist)
    438 		sglist = sgnext
    439 	}
    440 
    441 	if cas == nil {
    442 		// We can wake up with gp.param == nil (so cas == nil)
    443 		// when a channel involved in the select has been closed.
    444 		// It is easiest to loop and re-run the operation;
    445 		// we'll see that it's now closed.
    446 		// Maybe some day we can signal the close explicitly,
    447 		// but we'd have to distinguish close-on-reader from close-on-writer.
    448 		// It's easiest not to duplicate the code and just recheck above.
    449 		// We know that something closed, and things never un-close,
    450 		// so we won't block again.
    451 		goto loop
    452 	}
    453 
    454 	c = cas.c
    455 
    456 	if debugSelect {
    457 		print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
    458 	}
    459 
    460 	if cas.kind == caseRecv && cas.receivedp != nil {
    461 		*cas.receivedp = true
    462 	}
    463 
    464 	if raceenabled {
    465 		if cas.kind == caseRecv && cas.elem != nil {
    466 			raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
    467 		} else if cas.kind == caseSend {
    468 			raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
    469 		}
    470 	}
    471 	if msanenabled {
    472 		if cas.kind == caseRecv && cas.elem != nil {
    473 			msanwrite(cas.elem, c.elemtype.size)
    474 		} else if cas.kind == caseSend {
    475 			msanread(cas.elem, c.elemtype.size)
    476 		}
    477 	}
    478 
    479 	selunlock(scases, lockorder)
    480 	goto retc
    481 
    482 bufrecv:
    483 	// can receive from buffer
    484 	if raceenabled {
    485 		if cas.elem != nil {
    486 			raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
    487 		}
    488 		raceacquire(chanbuf(c, c.recvx))
    489 		racerelease(chanbuf(c, c.recvx))
    490 	}
    491 	if msanenabled && cas.elem != nil {
    492 		msanwrite(cas.elem, c.elemtype.size)
    493 	}
    494 	if cas.receivedp != nil {
    495 		*cas.receivedp = true
    496 	}
    497 	qp = chanbuf(c, c.recvx)
    498 	if cas.elem != nil {
    499 		typedmemmove(c.elemtype, cas.elem, qp)
    500 	}
    501 	typedmemclr(c.elemtype, qp)
    502 	c.recvx++
    503 	if c.recvx == c.dataqsiz {
    504 		c.recvx = 0
    505 	}
    506 	c.qcount--
    507 	selunlock(scases, lockorder)
    508 	goto retc
    509 
    510 bufsend:
    511 	// can send to buffer
    512 	if raceenabled {
    513 		raceacquire(chanbuf(c, c.sendx))
    514 		racerelease(chanbuf(c, c.sendx))
    515 		raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
    516 	}
    517 	if msanenabled {
    518 		msanread(cas.elem, c.elemtype.size)
    519 	}
    520 	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
    521 	c.sendx++
    522 	if c.sendx == c.dataqsiz {
    523 		c.sendx = 0
    524 	}
    525 	c.qcount++
    526 	selunlock(scases, lockorder)
    527 	goto retc
    528 
    529 recv:
    530 	// can receive from sleeping sender (sg)
    531 	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    532 	if debugSelect {
    533 		print("syncrecv: sel=", sel, " c=", c, "\n")
    534 	}
    535 	if cas.receivedp != nil {
    536 		*cas.receivedp = true
    537 	}
    538 	goto retc
    539 
    540 rclose:
    541 	// read at end of closed channel
    542 	selunlock(scases, lockorder)
    543 	if cas.receivedp != nil {
    544 		*cas.receivedp = false
    545 	}
    546 	if cas.elem != nil {
    547 		typedmemclr(c.elemtype, cas.elem)
    548 	}
    549 	if raceenabled {
    550 		raceacquire(unsafe.Pointer(c))
    551 	}
    552 	goto retc
    553 
    554 send:
    555 	// can send to a sleeping receiver (sg)
    556 	if raceenabled {
    557 		raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
    558 	}
    559 	if msanenabled {
    560 		msanread(cas.elem, c.elemtype.size)
    561 	}
    562 	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    563 	if debugSelect {
    564 		print("syncsend: sel=", sel, " c=", c, "\n")
    565 	}
    566 	goto retc
    567 
    568 retc:
    569 	if cas.releasetime > 0 {
    570 		blockevent(cas.releasetime-t0, 1)
    571 	}
    572 	return casi
    573 
    574 sclose:
    575 	// send on closed channel
    576 	selunlock(scases, lockorder)
    577 	panic(plainError("send on closed channel"))
    578 }
    579 
    580 func (c *hchan) sortkey() uintptr {
    581 	// TODO(khr): if we have a moving garbage collector, we'll need to
    582 	// change this function.
    583 	return uintptr(unsafe.Pointer(c))
    584 }
    585 
    586 // A runtimeSelect is a single case passed to rselect.
    587 // This must match ../reflect/value.go:/runtimeSelect
    588 type runtimeSelect struct {
    589 	dir selectDir
    590 	typ unsafe.Pointer // channel type (not used here)
    591 	ch  *hchan         // channel
    592 	val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir)
    593 }
    594 
    595 // These values must match ../reflect/value.go:/SelectDir.
    596 type selectDir int
    597 
    598 const (
    599 	_             selectDir = iota
    600 	selectSend              // case Chan <- Send
    601 	selectRecv              // case <-Chan:
    602 	selectDefault           // default
    603 )
    604 
    605 //go:linkname reflect_rselect reflect.rselect
    606 func reflect_rselect(cases []runtimeSelect) (chosen int, recvOK bool) {
    607 	// flagNoScan is safe here, because all objects are also referenced from cases.
    608 	size := selectsize(uintptr(len(cases)))
    609 	sel := (*hselect)(mallocgc(size, nil, true))
    610 	newselect(sel, int64(size), int32(len(cases)))
    611 	r := new(bool)
    612 	for i := range cases {
    613 		rc := &cases[i]
    614 		switch rc.dir {
    615 		case selectDefault:
    616 			selectdefault(sel)
    617 		case selectSend:
    618 			selectsend(sel, rc.ch, rc.val)
    619 		case selectRecv:
    620 			selectrecv(sel, rc.ch, rc.val, r)
    621 		}
    622 	}
    623 
    624 	chosen = selectgo(sel)
    625 	recvOK = *r
    626 	return
    627 }
    628 
    629 func (q *waitq) dequeueSudoG(sgp *sudog) {
    630 	x := sgp.prev
    631 	y := sgp.next
    632 	if x != nil {
    633 		if y != nil {
    634 			// middle of queue
    635 			x.next = y
    636 			y.prev = x
    637 			sgp.next = nil
    638 			sgp.prev = nil
    639 			return
    640 		}
    641 		// end of queue
    642 		x.next = nil
    643 		q.last = x
    644 		sgp.prev = nil
    645 		return
    646 	}
    647 	if y != nil {
    648 		// start of queue
    649 		y.prev = nil
    650 		q.first = y
    651 		sgp.next = nil
    652 		return
    653 	}
    654 
    655 	// x==y==nil. Either sgp is the only element in the queue,
    656 	// or it has already been removed. Use q.first to disambiguate.
    657 	if q.first == sgp {
    658 		q.first = nil
    659 		q.last = nil
    660 	}
    661 }
    662