Home | History | Annotate | Download | only in runtime
      1 // Copyright 2013 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 // +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
      6 
      7 package runtime
      8 
      9 import (
     10 	"runtime/internal/atomic"
     11 	"unsafe"
     12 )
     13 
     14 // Integrated network poller (platform-independent part).
     15 // A particular implementation (epoll/kqueue) must define the following functions:
     16 // func netpollinit()			// to initialize the poller
     17 // func netpollopen(fd uintptr, pd *pollDesc) int32	// to arm edge-triggered notifications
     18 // and associate fd with pd.
     19 // An implementation must call the following function to denote that the pd is ready.
     20 // func netpollready(gpp **g, pd *pollDesc, mode int32)
     21 
     22 // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
     23 // goroutines respectively. The semaphore can be in the following states:
     24 // pdReady - io readiness notification is pending;
     25 //           a goroutine consumes the notification by changing the state to nil.
     26 // pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
     27 //          the goroutine commits to park by changing the state to G pointer,
     28 //          or, alternatively, concurrent io notification changes the state to READY,
     29 //          or, alternatively, concurrent timeout/close changes the state to nil.
     30 // G pointer - the goroutine is blocked on the semaphore;
     31 //             io notification or timeout/close changes the state to READY or nil respectively
     32 //             and unparks the goroutine.
     33 // nil - nothing of the above.
     34 const (
     35 	pdReady uintptr = 1
     36 	pdWait  uintptr = 2
     37 )
     38 
     39 const pollBlockSize = 4 * 1024
     40 
     41 // Network poller descriptor.
     42 //
     43 // No heap pointers.
     44 //
     45 //go:notinheap
     46 type pollDesc struct {
     47 	link *pollDesc // in pollcache, protected by pollcache.lock
     48 
     49 	// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
     50 	// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
     51 	// pollReset, pollWait, pollWaitCanceled and runtimenetpollready (IO readiness notification)
     52 	// proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
     53 	// in a lock-free way by all operations.
     54 	// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
     55 	// that will blow up when GC starts moving objects.
     56 	lock    mutex // protects the following fields
     57 	fd      uintptr
     58 	closing bool
     59 	seq     uintptr // protects from stale timers and ready notifications
     60 	rg      uintptr // pdReady, pdWait, G waiting for read or nil
     61 	rt      timer   // read deadline timer (set if rt.f != nil)
     62 	rd      int64   // read deadline
     63 	wg      uintptr // pdReady, pdWait, G waiting for write or nil
     64 	wt      timer   // write deadline timer
     65 	wd      int64   // write deadline
     66 	user    uint32  // user settable cookie
     67 }
     68 
     69 type pollCache struct {
     70 	lock  mutex
     71 	first *pollDesc
     72 	// PollDesc objects must be type-stable,
     73 	// because we can get ready notification from epoll/kqueue
     74 	// after the descriptor is closed/reused.
     75 	// Stale notifications are detected using seq variable,
     76 	// seq is incremented when deadlines are changed or descriptor is reused.
     77 }
     78 
     79 var (
     80 	netpollInited  uint32
     81 	pollcache      pollCache
     82 	netpollWaiters uint32
     83 )
     84 
     85 //go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
     86 func poll_runtime_pollServerInit() {
     87 	netpollinit()
     88 	atomic.Store(&netpollInited, 1)
     89 }
     90 
     91 func netpollinited() bool {
     92 	return atomic.Load(&netpollInited) != 0
     93 }
     94 
     95 //go:linkname poll_runtime_pollServerDescriptor internal/poll.runtime_pollServerDescriptor
     96 
     97 // poll_runtime_pollServerDescriptor returns the descriptor being used,
     98 // or ^uintptr(0) if the system does not use a poll descriptor.
     99 func poll_runtime_pollServerDescriptor() uintptr {
    100 	return netpolldescriptor()
    101 }
    102 
    103 //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
    104 func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    105 	pd := pollcache.alloc()
    106 	lock(&pd.lock)
    107 	if pd.wg != 0 && pd.wg != pdReady {
    108 		throw("runtime: blocked write on free polldesc")
    109 	}
    110 	if pd.rg != 0 && pd.rg != pdReady {
    111 		throw("runtime: blocked read on free polldesc")
    112 	}
    113 	pd.fd = fd
    114 	pd.closing = false
    115 	pd.seq++
    116 	pd.rg = 0
    117 	pd.rd = 0
    118 	pd.wg = 0
    119 	pd.wd = 0
    120 	unlock(&pd.lock)
    121 
    122 	var errno int32
    123 	errno = netpollopen(fd, pd)
    124 	return pd, int(errno)
    125 }
    126 
    127 //go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
    128 func poll_runtime_pollClose(pd *pollDesc) {
    129 	if !pd.closing {
    130 		throw("runtime: close polldesc w/o unblock")
    131 	}
    132 	if pd.wg != 0 && pd.wg != pdReady {
    133 		throw("runtime: blocked write on closing polldesc")
    134 	}
    135 	if pd.rg != 0 && pd.rg != pdReady {
    136 		throw("runtime: blocked read on closing polldesc")
    137 	}
    138 	netpollclose(pd.fd)
    139 	pollcache.free(pd)
    140 }
    141 
    142 func (c *pollCache) free(pd *pollDesc) {
    143 	lock(&c.lock)
    144 	pd.link = c.first
    145 	c.first = pd
    146 	unlock(&c.lock)
    147 }
    148 
    149 //go:linkname poll_runtime_pollReset internal/poll.runtime_pollReset
    150 func poll_runtime_pollReset(pd *pollDesc, mode int) int {
    151 	err := netpollcheckerr(pd, int32(mode))
    152 	if err != 0 {
    153 		return err
    154 	}
    155 	if mode == 'r' {
    156 		pd.rg = 0
    157 	} else if mode == 'w' {
    158 		pd.wg = 0
    159 	}
    160 	return 0
    161 }
    162 
    163 //go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
    164 func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    165 	err := netpollcheckerr(pd, int32(mode))
    166 	if err != 0 {
    167 		return err
    168 	}
    169 	// As for now only Solaris uses level-triggered IO.
    170 	if GOOS == "solaris" {
    171 		netpollarm(pd, mode)
    172 	}
    173 	for !netpollblock(pd, int32(mode), false) {
    174 		err = netpollcheckerr(pd, int32(mode))
    175 		if err != 0 {
    176 			return err
    177 		}
    178 		// Can happen if timeout has fired and unblocked us,
    179 		// but before we had a chance to run, timeout has been reset.
    180 		// Pretend it has not happened and retry.
    181 	}
    182 	return 0
    183 }
    184 
    185 //go:linkname poll_runtime_pollWaitCanceled internal/poll.runtime_pollWaitCanceled
    186 func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
    187 	// This function is used only on windows after a failed attempt to cancel
    188 	// a pending async IO operation. Wait for ioready, ignore closing or timeouts.
    189 	for !netpollblock(pd, int32(mode), true) {
    190 	}
    191 }
    192 
    193 //go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline
    194 func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
    195 	lock(&pd.lock)
    196 	if pd.closing {
    197 		unlock(&pd.lock)
    198 		return
    199 	}
    200 	pd.seq++ // invalidate current timers
    201 	// Reset current timers.
    202 	if pd.rt.f != nil {
    203 		deltimer(&pd.rt)
    204 		pd.rt.f = nil
    205 	}
    206 	if pd.wt.f != nil {
    207 		deltimer(&pd.wt)
    208 		pd.wt.f = nil
    209 	}
    210 	// Setup new timers.
    211 	if d != 0 && d <= nanotime() {
    212 		d = -1
    213 	}
    214 	if mode == 'r' || mode == 'r'+'w' {
    215 		pd.rd = d
    216 	}
    217 	if mode == 'w' || mode == 'r'+'w' {
    218 		pd.wd = d
    219 	}
    220 	if pd.rd > 0 && pd.rd == pd.wd {
    221 		pd.rt.f = netpollDeadline
    222 		pd.rt.when = pd.rd
    223 		// Copy current seq into the timer arg.
    224 		// Timer func will check the seq against current descriptor seq,
    225 		// if they differ the descriptor was reused or timers were reset.
    226 		pd.rt.arg = pd
    227 		pd.rt.seq = pd.seq
    228 		addtimer(&pd.rt)
    229 	} else {
    230 		if pd.rd > 0 {
    231 			pd.rt.f = netpollReadDeadline
    232 			pd.rt.when = pd.rd
    233 			pd.rt.arg = pd
    234 			pd.rt.seq = pd.seq
    235 			addtimer(&pd.rt)
    236 		}
    237 		if pd.wd > 0 {
    238 			pd.wt.f = netpollWriteDeadline
    239 			pd.wt.when = pd.wd
    240 			pd.wt.arg = pd
    241 			pd.wt.seq = pd.seq
    242 			addtimer(&pd.wt)
    243 		}
    244 	}
    245 	// If we set the new deadline in the past, unblock currently pending IO if any.
    246 	var rg, wg *g
    247 	atomicstorep(unsafe.Pointer(&wg), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
    248 	if pd.rd < 0 {
    249 		rg = netpollunblock(pd, 'r', false)
    250 	}
    251 	if pd.wd < 0 {
    252 		wg = netpollunblock(pd, 'w', false)
    253 	}
    254 	unlock(&pd.lock)
    255 	if rg != nil {
    256 		netpollgoready(rg, 3)
    257 	}
    258 	if wg != nil {
    259 		netpollgoready(wg, 3)
    260 	}
    261 }
    262 
    263 //go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock
    264 func poll_runtime_pollUnblock(pd *pollDesc) {
    265 	lock(&pd.lock)
    266 	if pd.closing {
    267 		throw("runtime: unblock on closing polldesc")
    268 	}
    269 	pd.closing = true
    270 	pd.seq++
    271 	var rg, wg *g
    272 	atomicstorep(unsafe.Pointer(&rg), nil) // full memory barrier between store to closing and read of rg/wg in netpollunblock
    273 	rg = netpollunblock(pd, 'r', false)
    274 	wg = netpollunblock(pd, 'w', false)
    275 	if pd.rt.f != nil {
    276 		deltimer(&pd.rt)
    277 		pd.rt.f = nil
    278 	}
    279 	if pd.wt.f != nil {
    280 		deltimer(&pd.wt)
    281 		pd.wt.f = nil
    282 	}
    283 	unlock(&pd.lock)
    284 	if rg != nil {
    285 		netpollgoready(rg, 3)
    286 	}
    287 	if wg != nil {
    288 		netpollgoready(wg, 3)
    289 	}
    290 }
    291 
    292 // make pd ready, newly runnable goroutines (if any) are returned in rg/wg
    293 // May run during STW, so write barriers are not allowed.
    294 //go:nowritebarrier
    295 func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
    296 	var rg, wg guintptr
    297 	if mode == 'r' || mode == 'r'+'w' {
    298 		rg.set(netpollunblock(pd, 'r', true))
    299 	}
    300 	if mode == 'w' || mode == 'r'+'w' {
    301 		wg.set(netpollunblock(pd, 'w', true))
    302 	}
    303 	if rg != 0 {
    304 		rg.ptr().schedlink = *gpp
    305 		*gpp = rg
    306 	}
    307 	if wg != 0 {
    308 		wg.ptr().schedlink = *gpp
    309 		*gpp = wg
    310 	}
    311 }
    312 
    313 func netpollcheckerr(pd *pollDesc, mode int32) int {
    314 	if pd.closing {
    315 		return 1 // errClosing
    316 	}
    317 	if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
    318 		return 2 // errTimeout
    319 	}
    320 	return 0
    321 }
    322 
    323 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    324 	r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    325 	if r {
    326 		// Bump the count of goroutines waiting for the poller.
    327 		// The scheduler uses this to decide whether to block
    328 		// waiting for the poller if there is nothing else to do.
    329 		atomic.Xadd(&netpollWaiters, 1)
    330 	}
    331 	return r
    332 }
    333 
    334 func netpollgoready(gp *g, traceskip int) {
    335 	atomic.Xadd(&netpollWaiters, -1)
    336 	goready(gp, traceskip+1)
    337 }
    338 
    339 // returns true if IO is ready, or false if timedout or closed
    340 // waitio - wait only for completed IO, ignore errors
    341 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    342 	gpp := &pd.rg
    343 	if mode == 'w' {
    344 		gpp = &pd.wg
    345 	}
    346 
    347 	// set the gpp semaphore to WAIT
    348 	for {
    349 		old := *gpp
    350 		if old == pdReady {
    351 			*gpp = 0
    352 			return true
    353 		}
    354 		if old != 0 {
    355 			throw("runtime: double wait")
    356 		}
    357 		if atomic.Casuintptr(gpp, 0, pdWait) {
    358 			break
    359 		}
    360 	}
    361 
    362 	// need to recheck error states after setting gpp to WAIT
    363 	// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    364 	// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
    365 	if waitio || netpollcheckerr(pd, mode) == 0 {
    366 		gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
    367 	}
    368 	// be careful to not lose concurrent READY notification
    369 	old := atomic.Xchguintptr(gpp, 0)
    370 	if old > pdWait {
    371 		throw("runtime: corrupted polldesc")
    372 	}
    373 	return old == pdReady
    374 }
    375 
    376 func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    377 	gpp := &pd.rg
    378 	if mode == 'w' {
    379 		gpp = &pd.wg
    380 	}
    381 
    382 	for {
    383 		old := *gpp
    384 		if old == pdReady {
    385 			return nil
    386 		}
    387 		if old == 0 && !ioready {
    388 			// Only set READY for ioready. runtime_pollWait
    389 			// will check for timeout/cancel before waiting.
    390 			return nil
    391 		}
    392 		var new uintptr
    393 		if ioready {
    394 			new = pdReady
    395 		}
    396 		if atomic.Casuintptr(gpp, old, new) {
    397 			if old == pdReady || old == pdWait {
    398 				old = 0
    399 			}
    400 			return (*g)(unsafe.Pointer(old))
    401 		}
    402 	}
    403 }
    404 
    405 func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
    406 	lock(&pd.lock)
    407 	// Seq arg is seq when the timer was set.
    408 	// If it's stale, ignore the timer event.
    409 	if seq != pd.seq {
    410 		// The descriptor was reused or timers were reset.
    411 		unlock(&pd.lock)
    412 		return
    413 	}
    414 	var rg *g
    415 	if read {
    416 		if pd.rd <= 0 || pd.rt.f == nil {
    417 			throw("runtime: inconsistent read deadline")
    418 		}
    419 		pd.rd = -1
    420 		atomicstorep(unsafe.Pointer(&pd.rt.f), nil) // full memory barrier between store to rd and load of rg in netpollunblock
    421 		rg = netpollunblock(pd, 'r', false)
    422 	}
    423 	var wg *g
    424 	if write {
    425 		if pd.wd <= 0 || pd.wt.f == nil && !read {
    426 			throw("runtime: inconsistent write deadline")
    427 		}
    428 		pd.wd = -1
    429 		atomicstorep(unsafe.Pointer(&pd.wt.f), nil) // full memory barrier between store to wd and load of wg in netpollunblock
    430 		wg = netpollunblock(pd, 'w', false)
    431 	}
    432 	unlock(&pd.lock)
    433 	if rg != nil {
    434 		netpollgoready(rg, 0)
    435 	}
    436 	if wg != nil {
    437 		netpollgoready(wg, 0)
    438 	}
    439 }
    440 
    441 func netpollDeadline(arg interface{}, seq uintptr) {
    442 	netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
    443 }
    444 
    445 func netpollReadDeadline(arg interface{}, seq uintptr) {
    446 	netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
    447 }
    448 
    449 func netpollWriteDeadline(arg interface{}, seq uintptr) {
    450 	netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
    451 }
    452 
    453 func (c *pollCache) alloc() *pollDesc {
    454 	lock(&c.lock)
    455 	if c.first == nil {
    456 		const pdSize = unsafe.Sizeof(pollDesc{})
    457 		n := pollBlockSize / pdSize
    458 		if n == 0 {
    459 			n = 1
    460 		}
    461 		// Must be in non-GC memory because can be referenced
    462 		// only from epoll/kqueue internals.
    463 		mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
    464 		for i := uintptr(0); i < n; i++ {
    465 			pd := (*pollDesc)(add(mem, i*pdSize))
    466 			pd.link = c.first
    467 			c.first = pd
    468 		}
    469 	}
    470 	pd := c.first
    471 	c.first = pd.link
    472 	unlock(&c.lock)
    473 	return pd
    474 }
    475