1 // Copyright 2014 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package runtime 6 7 // This file contains the implementation of Go channels. 8 9 import "unsafe" 10 11 const ( 12 maxAlign = 8 13 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) 14 debugChan = false 15 ) 16 17 type hchan struct { 18 qcount uint // total data in the queue 19 dataqsiz uint // size of the circular queue 20 buf unsafe.Pointer // points to an array of dataqsiz elements 21 elemsize uint16 22 closed uint32 23 elemtype *_type // element type 24 sendx uint // send index 25 recvx uint // receive index 26 recvq waitq // list of recv waiters 27 sendq waitq // list of send waiters 28 lock mutex 29 } 30 31 type waitq struct { 32 first *sudog 33 last *sudog 34 } 35 36 //go:linkname reflect_makechan reflect.makechan 37 func reflect_makechan(t *chantype, size int64) *hchan { 38 return makechan(t, size) 39 } 40 41 func makechan(t *chantype, size int64) *hchan { 42 elem := t.elem 43 44 // compiler checks this but be safe. 45 if elem.size >= 1<<16 { 46 throw("makechan: invalid channel element type") 47 } 48 if hchanSize%maxAlign != 0 || elem.align > maxAlign { 49 throw("makechan: bad alignment") 50 } 51 if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/uintptr(elem.size)) { 52 panic("makechan: size out of range") 53 } 54 55 var c *hchan 56 if elem.kind&kindNoPointers != 0 || size == 0 { 57 // Allocate memory in one call. 58 // Hchan does not contain pointers interesting for GC in this case: 59 // buf points into the same allocation, elemtype is persistent. 60 // SudoG's are referenced from their owning thread so they can't be collected. 61 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. 62 c = (*hchan)(mallocgc(hchanSize+uintptr(size)*uintptr(elem.size), nil, flagNoScan)) 63 if size > 0 && elem.size != 0 { 64 c.buf = add(unsafe.Pointer(c), hchanSize) 65 } else { 66 // race detector uses this location for synchronization 67 // Also prevents us from pointing beyond the allocation (see issue 9401). 68 c.buf = unsafe.Pointer(c) 69 } 70 } else { 71 c = new(hchan) 72 c.buf = newarray(elem, uintptr(size)) 73 } 74 c.elemsize = uint16(elem.size) 75 c.elemtype = elem 76 c.dataqsiz = uint(size) 77 78 if debugChan { 79 print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n") 80 } 81 return c 82 } 83 84 // chanbuf(c, i) is pointer to the i'th slot in the buffer. 85 func chanbuf(c *hchan, i uint) unsafe.Pointer { 86 return add(c.buf, uintptr(i)*uintptr(c.elemsize)) 87 } 88 89 // entry point for c <- x from compiled code 90 //go:nosplit 91 func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) { 92 chansend(t, c, elem, true, getcallerpc(unsafe.Pointer(&t))) 93 } 94 95 /* 96 * generic single channel send/recv 97 * If block is not nil, 98 * then the protocol will not 99 * sleep but return if it could 100 * not complete. 101 * 102 * sleep can wake up with g.param == nil 103 * when a channel involved in the sleep has 104 * been closed. it is easiest to loop and re-run 105 * the operation; we'll see that it's now closed. 106 */ 107 func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { 108 if raceenabled { 109 raceReadObjectPC(t.elem, ep, callerpc, funcPC(chansend)) 110 } 111 112 if c == nil { 113 if !block { 114 return false 115 } 116 gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2) 117 throw("unreachable") 118 } 119 120 if debugChan { 121 print("chansend: chan=", c, "\n") 122 } 123 124 if raceenabled { 125 racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend)) 126 } 127 128 // Fast path: check for failed non-blocking operation without acquiring the lock. 129 // 130 // After observing that the channel is not closed, we observe that the channel is 131 // not ready for sending. Each of these observations is a single word-sized read 132 // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel). 133 // Because a closed channel cannot transition from 'ready for sending' to 134 // 'not ready for sending', even if the channel is closed between the two observations, 135 // they imply a moment between the two when the channel was both not yet closed 136 // and not ready for sending. We behave as if we observed the channel at that moment, 137 // and report that the send cannot proceed. 138 // 139 // It is okay if the reads are reordered here: if we observe that the channel is not 140 // ready for sending and then observe that it is not closed, that implies that the 141 // channel wasn't closed during the first observation. 142 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || 143 (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { 144 return false 145 } 146 147 var t0 int64 148 if blockprofilerate > 0 { 149 t0 = cputicks() 150 } 151 152 lock(&c.lock) 153 if c.closed != 0 { 154 unlock(&c.lock) 155 panic("send on closed channel") 156 } 157 158 if c.dataqsiz == 0 { // synchronous channel 159 sg := c.recvq.dequeue() 160 if sg != nil { // found a waiting receiver 161 if raceenabled { 162 racesync(c, sg) 163 } 164 unlock(&c.lock) 165 166 recvg := sg.g 167 if sg.elem != nil { 168 syncsend(c, sg, ep) 169 } 170 recvg.param = unsafe.Pointer(sg) 171 if sg.releasetime != 0 { 172 sg.releasetime = cputicks() 173 } 174 goready(recvg, 3) 175 return true 176 } 177 178 if !block { 179 unlock(&c.lock) 180 return false 181 } 182 183 // no receiver available: block on this channel. 184 gp := getg() 185 mysg := acquireSudog() 186 mysg.releasetime = 0 187 if t0 != 0 { 188 mysg.releasetime = -1 189 } 190 mysg.elem = ep 191 mysg.waitlink = nil 192 gp.waiting = mysg 193 mysg.g = gp 194 mysg.selectdone = nil 195 gp.param = nil 196 c.sendq.enqueue(mysg) 197 goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) 198 199 // someone woke us up. 200 if mysg != gp.waiting { 201 throw("G waiting list is corrupted!") 202 } 203 gp.waiting = nil 204 if gp.param == nil { 205 if c.closed == 0 { 206 throw("chansend: spurious wakeup") 207 } 208 panic("send on closed channel") 209 } 210 gp.param = nil 211 if mysg.releasetime > 0 { 212 blockevent(int64(mysg.releasetime)-t0, 2) 213 } 214 releaseSudog(mysg) 215 return true 216 } 217 218 // asynchronous channel 219 // wait for some space to write our data 220 var t1 int64 221 for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup { 222 if !block { 223 unlock(&c.lock) 224 return false 225 } 226 gp := getg() 227 mysg := acquireSudog() 228 mysg.releasetime = 0 229 if t0 != 0 { 230 mysg.releasetime = -1 231 } 232 mysg.g = gp 233 mysg.elem = nil 234 mysg.selectdone = nil 235 c.sendq.enqueue(mysg) 236 goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3) 237 238 // someone woke us up - try again 239 if mysg.releasetime > 0 { 240 t1 = mysg.releasetime 241 } 242 releaseSudog(mysg) 243 lock(&c.lock) 244 if c.closed != 0 { 245 unlock(&c.lock) 246 panic("send on closed channel") 247 } 248 } 249 250 // write our data into the channel buffer 251 if raceenabled { 252 raceacquire(chanbuf(c, c.sendx)) 253 racerelease(chanbuf(c, c.sendx)) 254 } 255 typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep) 256 c.sendx++ 257 if c.sendx == c.dataqsiz { 258 c.sendx = 0 259 } 260 c.qcount++ 261 262 // wake up a waiting receiver 263 sg := c.recvq.dequeue() 264 if sg != nil { 265 recvg := sg.g 266 unlock(&c.lock) 267 if sg.releasetime != 0 { 268 sg.releasetime = cputicks() 269 } 270 goready(recvg, 3) 271 } else { 272 unlock(&c.lock) 273 } 274 if t1 > 0 { 275 blockevent(t1-t0, 2) 276 } 277 return true 278 } 279 280 func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) { 281 // Send on unbuffered channel is the only operation 282 // in the entire runtime where one goroutine 283 // writes to the stack of another goroutine. The GC assumes that 284 // stack writes only happen when the goroutine is running and are 285 // only done by that goroutine. Using a write barrier is sufficient to 286 // make up for violating that assumption, but the write barrier has to work. 287 // typedmemmove will call heapBitsBulkBarrier, but the target bytes 288 // are not in the heap, so that will not help. We arrange to call 289 // memmove and typeBitsBulkBarrier instead. 290 memmove(sg.elem, elem, c.elemtype.size) 291 typeBitsBulkBarrier(c.elemtype, uintptr(sg.elem), c.elemtype.size) 292 sg.elem = nil 293 } 294 295 func closechan(c *hchan) { 296 if c == nil { 297 panic("close of nil channel") 298 } 299 300 lock(&c.lock) 301 if c.closed != 0 { 302 unlock(&c.lock) 303 panic("close of closed channel") 304 } 305 306 if raceenabled { 307 callerpc := getcallerpc(unsafe.Pointer(&c)) 308 racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan)) 309 racerelease(unsafe.Pointer(c)) 310 } 311 312 c.closed = 1 313 314 // release all readers 315 for { 316 sg := c.recvq.dequeue() 317 if sg == nil { 318 break 319 } 320 gp := sg.g 321 sg.elem = nil 322 gp.param = nil 323 if sg.releasetime != 0 { 324 sg.releasetime = cputicks() 325 } 326 goready(gp, 3) 327 } 328 329 // release all writers 330 for { 331 sg := c.sendq.dequeue() 332 if sg == nil { 333 break 334 } 335 gp := sg.g 336 sg.elem = nil 337 gp.param = nil 338 if sg.releasetime != 0 { 339 sg.releasetime = cputicks() 340 } 341 goready(gp, 3) 342 } 343 unlock(&c.lock) 344 } 345 346 // entry points for <- c from compiled code 347 //go:nosplit 348 func chanrecv1(t *chantype, c *hchan, elem unsafe.Pointer) { 349 chanrecv(t, c, elem, true) 350 } 351 352 //go:nosplit 353 func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) { 354 _, received = chanrecv(t, c, elem, true) 355 return 356 } 357 358 // chanrecv receives on channel c and writes the received data to ep. 359 // ep may be nil, in which case received data is ignored. 360 // If block == false and no elements are available, returns (false, false). 361 // Otherwise, if c is closed, zeros *ep and returns (true, false). 362 // Otherwise, fills in *ep with an element and returns (true, true). 363 func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { 364 // raceenabled: don't need to check ep, as it is always on the stack. 365 366 if debugChan { 367 print("chanrecv: chan=", c, "\n") 368 } 369 370 if c == nil { 371 if !block { 372 return 373 } 374 gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2) 375 throw("unreachable") 376 } 377 378 // Fast path: check for failed non-blocking operation without acquiring the lock. 379 // 380 // After observing that the channel is not ready for receiving, we observe that the 381 // channel is not closed. Each of these observations is a single word-sized read 382 // (first c.sendq.first or c.qcount, and second c.closed). 383 // Because a channel cannot be reopened, the later observation of the channel 384 // being not closed implies that it was also not closed at the moment of the 385 // first observation. We behave as if we observed the channel at that moment 386 // and report that the receive cannot proceed. 387 // 388 // The order of operations is important here: reversing the operations can lead to 389 // incorrect behavior when racing with a close. 390 if !block && (c.dataqsiz == 0 && c.sendq.first == nil || 391 c.dataqsiz > 0 && atomicloaduint(&c.qcount) == 0) && 392 atomicload(&c.closed) == 0 { 393 return 394 } 395 396 var t0 int64 397 if blockprofilerate > 0 { 398 t0 = cputicks() 399 } 400 401 lock(&c.lock) 402 if c.dataqsiz == 0 { // synchronous channel 403 if c.closed != 0 { 404 return recvclosed(c, ep) 405 } 406 407 sg := c.sendq.dequeue() 408 if sg != nil { 409 if raceenabled { 410 racesync(c, sg) 411 } 412 unlock(&c.lock) 413 414 if ep != nil { 415 typedmemmove(c.elemtype, ep, sg.elem) 416 } 417 sg.elem = nil 418 gp := sg.g 419 gp.param = unsafe.Pointer(sg) 420 if sg.releasetime != 0 { 421 sg.releasetime = cputicks() 422 } 423 goready(gp, 3) 424 selected = true 425 received = true 426 return 427 } 428 429 if !block { 430 unlock(&c.lock) 431 return 432 } 433 434 // no sender available: block on this channel. 435 gp := getg() 436 mysg := acquireSudog() 437 mysg.releasetime = 0 438 if t0 != 0 { 439 mysg.releasetime = -1 440 } 441 mysg.elem = ep 442 mysg.waitlink = nil 443 gp.waiting = mysg 444 mysg.g = gp 445 mysg.selectdone = nil 446 gp.param = nil 447 c.recvq.enqueue(mysg) 448 goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) 449 450 // someone woke us up 451 if mysg != gp.waiting { 452 throw("G waiting list is corrupted!") 453 } 454 gp.waiting = nil 455 if mysg.releasetime > 0 { 456 blockevent(mysg.releasetime-t0, 2) 457 } 458 haveData := gp.param != nil 459 gp.param = nil 460 releaseSudog(mysg) 461 462 if haveData { 463 // a sender sent us some data. It already wrote to ep. 464 selected = true 465 received = true 466 return 467 } 468 469 lock(&c.lock) 470 if c.closed == 0 { 471 throw("chanrecv: spurious wakeup") 472 } 473 return recvclosed(c, ep) 474 } 475 476 // asynchronous channel 477 // wait for some data to appear 478 var t1 int64 479 for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup { 480 if c.closed != 0 { 481 selected, received = recvclosed(c, ep) 482 if t1 > 0 { 483 blockevent(t1-t0, 2) 484 } 485 return 486 } 487 488 if !block { 489 unlock(&c.lock) 490 return 491 } 492 493 // wait for someone to send an element 494 gp := getg() 495 mysg := acquireSudog() 496 mysg.releasetime = 0 497 if t0 != 0 { 498 mysg.releasetime = -1 499 } 500 mysg.elem = nil 501 mysg.g = gp 502 mysg.selectdone = nil 503 504 c.recvq.enqueue(mysg) 505 goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3) 506 507 // someone woke us up - try again 508 if mysg.releasetime > 0 { 509 t1 = mysg.releasetime 510 } 511 releaseSudog(mysg) 512 lock(&c.lock) 513 } 514 515 if raceenabled { 516 raceacquire(chanbuf(c, c.recvx)) 517 racerelease(chanbuf(c, c.recvx)) 518 } 519 if ep != nil { 520 typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx)) 521 } 522 memclr(chanbuf(c, c.recvx), uintptr(c.elemsize)) 523 524 c.recvx++ 525 if c.recvx == c.dataqsiz { 526 c.recvx = 0 527 } 528 c.qcount-- 529 530 // ping a sender now that there is space 531 sg := c.sendq.dequeue() 532 if sg != nil { 533 gp := sg.g 534 unlock(&c.lock) 535 if sg.releasetime != 0 { 536 sg.releasetime = cputicks() 537 } 538 goready(gp, 3) 539 } else { 540 unlock(&c.lock) 541 } 542 543 if t1 > 0 { 544 blockevent(t1-t0, 2) 545 } 546 selected = true 547 received = true 548 return 549 } 550 551 // recvclosed is a helper function for chanrecv. Handles cleanup 552 // when the receiver encounters a closed channel. 553 // Caller must hold c.lock, recvclosed will release the lock. 554 func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) { 555 if raceenabled { 556 raceacquire(unsafe.Pointer(c)) 557 } 558 unlock(&c.lock) 559 if ep != nil { 560 memclr(ep, uintptr(c.elemsize)) 561 } 562 return true, false 563 } 564 565 // compiler implements 566 // 567 // select { 568 // case c <- v: 569 // ... foo 570 // default: 571 // ... bar 572 // } 573 // 574 // as 575 // 576 // if selectnbsend(c, v) { 577 // ... foo 578 // } else { 579 // ... bar 580 // } 581 // 582 func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) { 583 return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t))) 584 } 585 586 // compiler implements 587 // 588 // select { 589 // case v = <-c: 590 // ... foo 591 // default: 592 // ... bar 593 // } 594 // 595 // as 596 // 597 // if selectnbrecv(&v, c) { 598 // ... foo 599 // } else { 600 // ... bar 601 // } 602 // 603 func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) { 604 selected, _ = chanrecv(t, c, elem, false) 605 return 606 } 607 608 // compiler implements 609 // 610 // select { 611 // case v, ok = <-c: 612 // ... foo 613 // default: 614 // ... bar 615 // } 616 // 617 // as 618 // 619 // if c != nil && selectnbrecv2(&v, &ok, c) { 620 // ... foo 621 // } else { 622 // ... bar 623 // } 624 // 625 func selectnbrecv2(t *chantype, elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { 626 // TODO(khr): just return 2 values from this function, now that it is in Go. 627 selected, *received = chanrecv(t, c, elem, false) 628 return 629 } 630 631 //go:linkname reflect_chansend reflect.chansend 632 func reflect_chansend(t *chantype, c *hchan, elem unsafe.Pointer, nb bool) (selected bool) { 633 return chansend(t, c, elem, !nb, getcallerpc(unsafe.Pointer(&t))) 634 } 635 636 //go:linkname reflect_chanrecv reflect.chanrecv 637 func reflect_chanrecv(t *chantype, c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) { 638 return chanrecv(t, c, elem, !nb) 639 } 640 641 //go:linkname reflect_chanlen reflect.chanlen 642 func reflect_chanlen(c *hchan) int { 643 if c == nil { 644 return 0 645 } 646 return int(c.qcount) 647 } 648 649 //go:linkname reflect_chancap reflect.chancap 650 func reflect_chancap(c *hchan) int { 651 if c == nil { 652 return 0 653 } 654 return int(c.dataqsiz) 655 } 656 657 //go:linkname reflect_chanclose reflect.chanclose 658 func reflect_chanclose(c *hchan) { 659 closechan(c) 660 } 661 662 func (q *waitq) enqueue(sgp *sudog) { 663 sgp.next = nil 664 x := q.last 665 if x == nil { 666 sgp.prev = nil 667 q.first = sgp 668 q.last = sgp 669 return 670 } 671 sgp.prev = x 672 x.next = sgp 673 q.last = sgp 674 } 675 676 func (q *waitq) dequeue() *sudog { 677 for { 678 sgp := q.first 679 if sgp == nil { 680 return nil 681 } 682 y := sgp.next 683 if y == nil { 684 q.first = nil 685 q.last = nil 686 } else { 687 y.prev = nil 688 q.first = y 689 sgp.next = nil // mark as removed (see dequeueSudog) 690 } 691 692 // if sgp participates in a select and is already signaled, ignore it 693 if sgp.selectdone != nil { 694 // claim the right to signal 695 if *sgp.selectdone != 0 || !cas(sgp.selectdone, 0, 1) { 696 continue 697 } 698 } 699 700 return sgp 701 } 702 } 703 704 func racesync(c *hchan, sg *sudog) { 705 racerelease(chanbuf(c, 0)) 706 raceacquireg(sg.g, chanbuf(c, 0)) 707 racereleaseg(sg.g, chanbuf(c, 0)) 708 raceacquire(chanbuf(c, 0)) 709 } 710