Home | History | Annotate | Download | only in runtime
      1 // Copyright 2013 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 // +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
      6 
      7 package runtime
      8 
      9 import (
     10 	"runtime/internal/atomic"
     11 	"unsafe"
     12 )
     13 
     14 // Integrated network poller (platform-independent part).
     15 // A particular implementation (epoll/kqueue) must define the following functions:
     16 // func netpollinit()			// to initialize the poller
     17 // func netpollopen(fd uintptr, pd *pollDesc) int32	// to arm edge-triggered notifications
     18 // and associate fd with pd.
     19 // An implementation must call the following function to denote that the pd is ready.
     20 // func netpollready(gpp **g, pd *pollDesc, mode int32)
     21 
     22 // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
     23 // goroutines respectively. The semaphore can be in the following states:
     24 // pdReady - io readiness notification is pending;
     25 //           a goroutine consumes the notification by changing the state to nil.
     26 // pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
     27 //          the goroutine commits to park by changing the state to G pointer,
     28 //          or, alternatively, concurrent io notification changes the state to READY,
     29 //          or, alternatively, concurrent timeout/close changes the state to nil.
     30 // G pointer - the goroutine is blocked on the semaphore;
     31 //             io notification or timeout/close changes the state to READY or nil respectively
     32 //             and unparks the goroutine.
     33 // nil - nothing of the above.
     34 const (
     35 	pdReady uintptr = 1
     36 	pdWait  uintptr = 2
     37 )
     38 
     39 const pollBlockSize = 4 * 1024
     40 
     41 // Network poller descriptor.
     42 //
     43 // No heap pointers.
     44 //
     45 //go:notinheap
     46 type pollDesc struct {
     47 	link *pollDesc // in pollcache, protected by pollcache.lock
     48 
     49 	// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
     50 	// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
     51 	// pollReset, pollWait, pollWaitCanceled and runtimenetpollready (IO readiness notification)
     52 	// proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
     53 	// in a lock-free way by all operations.
     54 	// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
     55 	// that will blow up when GC starts moving objects.
     56 	lock    mutex // protects the following fields
     57 	fd      uintptr
     58 	closing bool
     59 	seq     uintptr // protects from stale timers and ready notifications
     60 	rg      uintptr // pdReady, pdWait, G waiting for read or nil
     61 	rt      timer   // read deadline timer (set if rt.f != nil)
     62 	rd      int64   // read deadline
     63 	wg      uintptr // pdReady, pdWait, G waiting for write or nil
     64 	wt      timer   // write deadline timer
     65 	wd      int64   // write deadline
     66 	user    uint32  // user settable cookie
     67 }
     68 
     69 type pollCache struct {
     70 	lock  mutex
     71 	first *pollDesc
     72 	// PollDesc objects must be type-stable,
     73 	// because we can get ready notification from epoll/kqueue
     74 	// after the descriptor is closed/reused.
     75 	// Stale notifications are detected using seq variable,
     76 	// seq is incremented when deadlines are changed or descriptor is reused.
     77 }
     78 
     79 var (
     80 	netpollInited uint32
     81 	pollcache     pollCache
     82 )
     83 
     84 //go:linkname net_runtime_pollServerInit net.runtime_pollServerInit
     85 func net_runtime_pollServerInit() {
     86 	netpollinit()
     87 	atomic.Store(&netpollInited, 1)
     88 }
     89 
     90 func netpollinited() bool {
     91 	return atomic.Load(&netpollInited) != 0
     92 }
     93 
     94 //go:linkname net_runtime_pollOpen net.runtime_pollOpen
     95 func net_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
     96 	pd := pollcache.alloc()
     97 	lock(&pd.lock)
     98 	if pd.wg != 0 && pd.wg != pdReady {
     99 		throw("netpollOpen: blocked write on free descriptor")
    100 	}
    101 	if pd.rg != 0 && pd.rg != pdReady {
    102 		throw("netpollOpen: blocked read on free descriptor")
    103 	}
    104 	pd.fd = fd
    105 	pd.closing = false
    106 	pd.seq++
    107 	pd.rg = 0
    108 	pd.rd = 0
    109 	pd.wg = 0
    110 	pd.wd = 0
    111 	unlock(&pd.lock)
    112 
    113 	var errno int32
    114 	errno = netpollopen(fd, pd)
    115 	return pd, int(errno)
    116 }
    117 
    118 //go:linkname net_runtime_pollClose net.runtime_pollClose
    119 func net_runtime_pollClose(pd *pollDesc) {
    120 	if !pd.closing {
    121 		throw("netpollClose: close w/o unblock")
    122 	}
    123 	if pd.wg != 0 && pd.wg != pdReady {
    124 		throw("netpollClose: blocked write on closing descriptor")
    125 	}
    126 	if pd.rg != 0 && pd.rg != pdReady {
    127 		throw("netpollClose: blocked read on closing descriptor")
    128 	}
    129 	netpollclose(pd.fd)
    130 	pollcache.free(pd)
    131 }
    132 
    133 func (c *pollCache) free(pd *pollDesc) {
    134 	lock(&c.lock)
    135 	pd.link = c.first
    136 	c.first = pd
    137 	unlock(&c.lock)
    138 }
    139 
    140 //go:linkname net_runtime_pollReset net.runtime_pollReset
    141 func net_runtime_pollReset(pd *pollDesc, mode int) int {
    142 	err := netpollcheckerr(pd, int32(mode))
    143 	if err != 0 {
    144 		return err
    145 	}
    146 	if mode == 'r' {
    147 		pd.rg = 0
    148 	} else if mode == 'w' {
    149 		pd.wg = 0
    150 	}
    151 	return 0
    152 }
    153 
    154 //go:linkname net_runtime_pollWait net.runtime_pollWait
    155 func net_runtime_pollWait(pd *pollDesc, mode int) int {
    156 	err := netpollcheckerr(pd, int32(mode))
    157 	if err != 0 {
    158 		return err
    159 	}
    160 	// As for now only Solaris uses level-triggered IO.
    161 	if GOOS == "solaris" {
    162 		netpollarm(pd, mode)
    163 	}
    164 	for !netpollblock(pd, int32(mode), false) {
    165 		err = netpollcheckerr(pd, int32(mode))
    166 		if err != 0 {
    167 			return err
    168 		}
    169 		// Can happen if timeout has fired and unblocked us,
    170 		// but before we had a chance to run, timeout has been reset.
    171 		// Pretend it has not happened and retry.
    172 	}
    173 	return 0
    174 }
    175 
    176 //go:linkname net_runtime_pollWaitCanceled net.runtime_pollWaitCanceled
    177 func net_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
    178 	// This function is used only on windows after a failed attempt to cancel
    179 	// a pending async IO operation. Wait for ioready, ignore closing or timeouts.
    180 	for !netpollblock(pd, int32(mode), true) {
    181 	}
    182 }
    183 
    184 //go:linkname net_runtime_pollSetDeadline net.runtime_pollSetDeadline
    185 func net_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
    186 	lock(&pd.lock)
    187 	if pd.closing {
    188 		unlock(&pd.lock)
    189 		return
    190 	}
    191 	pd.seq++ // invalidate current timers
    192 	// Reset current timers.
    193 	if pd.rt.f != nil {
    194 		deltimer(&pd.rt)
    195 		pd.rt.f = nil
    196 	}
    197 	if pd.wt.f != nil {
    198 		deltimer(&pd.wt)
    199 		pd.wt.f = nil
    200 	}
    201 	// Setup new timers.
    202 	if d != 0 && d <= nanotime() {
    203 		d = -1
    204 	}
    205 	if mode == 'r' || mode == 'r'+'w' {
    206 		pd.rd = d
    207 	}
    208 	if mode == 'w' || mode == 'r'+'w' {
    209 		pd.wd = d
    210 	}
    211 	if pd.rd > 0 && pd.rd == pd.wd {
    212 		pd.rt.f = netpollDeadline
    213 		pd.rt.when = pd.rd
    214 		// Copy current seq into the timer arg.
    215 		// Timer func will check the seq against current descriptor seq,
    216 		// if they differ the descriptor was reused or timers were reset.
    217 		pd.rt.arg = pd
    218 		pd.rt.seq = pd.seq
    219 		addtimer(&pd.rt)
    220 	} else {
    221 		if pd.rd > 0 {
    222 			pd.rt.f = netpollReadDeadline
    223 			pd.rt.when = pd.rd
    224 			pd.rt.arg = pd
    225 			pd.rt.seq = pd.seq
    226 			addtimer(&pd.rt)
    227 		}
    228 		if pd.wd > 0 {
    229 			pd.wt.f = netpollWriteDeadline
    230 			pd.wt.when = pd.wd
    231 			pd.wt.arg = pd
    232 			pd.wt.seq = pd.seq
    233 			addtimer(&pd.wt)
    234 		}
    235 	}
    236 	// If we set the new deadline in the past, unblock currently pending IO if any.
    237 	var rg, wg *g
    238 	atomicstorep(unsafe.Pointer(&wg), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
    239 	if pd.rd < 0 {
    240 		rg = netpollunblock(pd, 'r', false)
    241 	}
    242 	if pd.wd < 0 {
    243 		wg = netpollunblock(pd, 'w', false)
    244 	}
    245 	unlock(&pd.lock)
    246 	if rg != nil {
    247 		goready(rg, 3)
    248 	}
    249 	if wg != nil {
    250 		goready(wg, 3)
    251 	}
    252 }
    253 
    254 //go:linkname net_runtime_pollUnblock net.runtime_pollUnblock
    255 func net_runtime_pollUnblock(pd *pollDesc) {
    256 	lock(&pd.lock)
    257 	if pd.closing {
    258 		throw("netpollUnblock: already closing")
    259 	}
    260 	pd.closing = true
    261 	pd.seq++
    262 	var rg, wg *g
    263 	atomicstorep(unsafe.Pointer(&rg), nil) // full memory barrier between store to closing and read of rg/wg in netpollunblock
    264 	rg = netpollunblock(pd, 'r', false)
    265 	wg = netpollunblock(pd, 'w', false)
    266 	if pd.rt.f != nil {
    267 		deltimer(&pd.rt)
    268 		pd.rt.f = nil
    269 	}
    270 	if pd.wt.f != nil {
    271 		deltimer(&pd.wt)
    272 		pd.wt.f = nil
    273 	}
    274 	unlock(&pd.lock)
    275 	if rg != nil {
    276 		goready(rg, 3)
    277 	}
    278 	if wg != nil {
    279 		goready(wg, 3)
    280 	}
    281 }
    282 
    283 // make pd ready, newly runnable goroutines (if any) are returned in rg/wg
    284 // May run during STW, so write barriers are not allowed.
    285 //go:nowritebarrier
    286 func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
    287 	var rg, wg guintptr
    288 	if mode == 'r' || mode == 'r'+'w' {
    289 		rg.set(netpollunblock(pd, 'r', true))
    290 	}
    291 	if mode == 'w' || mode == 'r'+'w' {
    292 		wg.set(netpollunblock(pd, 'w', true))
    293 	}
    294 	if rg != 0 {
    295 		rg.ptr().schedlink = *gpp
    296 		*gpp = rg
    297 	}
    298 	if wg != 0 {
    299 		wg.ptr().schedlink = *gpp
    300 		*gpp = wg
    301 	}
    302 }
    303 
    304 func netpollcheckerr(pd *pollDesc, mode int32) int {
    305 	if pd.closing {
    306 		return 1 // errClosing
    307 	}
    308 	if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
    309 		return 2 // errTimeout
    310 	}
    311 	return 0
    312 }
    313 
    314 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    315 	return atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    316 }
    317 
    318 // returns true if IO is ready, or false if timedout or closed
    319 // waitio - wait only for completed IO, ignore errors
    320 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    321 	gpp := &pd.rg
    322 	if mode == 'w' {
    323 		gpp = &pd.wg
    324 	}
    325 
    326 	// set the gpp semaphore to WAIT
    327 	for {
    328 		old := *gpp
    329 		if old == pdReady {
    330 			*gpp = 0
    331 			return true
    332 		}
    333 		if old != 0 {
    334 			throw("netpollblock: double wait")
    335 		}
    336 		if atomic.Casuintptr(gpp, 0, pdWait) {
    337 			break
    338 		}
    339 	}
    340 
    341 	// need to recheck error states after setting gpp to WAIT
    342 	// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    343 	// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
    344 	if waitio || netpollcheckerr(pd, mode) == 0 {
    345 		gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
    346 	}
    347 	// be careful to not lose concurrent READY notification
    348 	old := atomic.Xchguintptr(gpp, 0)
    349 	if old > pdWait {
    350 		throw("netpollblock: corrupted state")
    351 	}
    352 	return old == pdReady
    353 }
    354 
    355 func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    356 	gpp := &pd.rg
    357 	if mode == 'w' {
    358 		gpp = &pd.wg
    359 	}
    360 
    361 	for {
    362 		old := *gpp
    363 		if old == pdReady {
    364 			return nil
    365 		}
    366 		if old == 0 && !ioready {
    367 			// Only set READY for ioready. runtime_pollWait
    368 			// will check for timeout/cancel before waiting.
    369 			return nil
    370 		}
    371 		var new uintptr
    372 		if ioready {
    373 			new = pdReady
    374 		}
    375 		if atomic.Casuintptr(gpp, old, new) {
    376 			if old == pdReady || old == pdWait {
    377 				old = 0
    378 			}
    379 			return (*g)(unsafe.Pointer(old))
    380 		}
    381 	}
    382 }
    383 
    384 func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
    385 	lock(&pd.lock)
    386 	// Seq arg is seq when the timer was set.
    387 	// If it's stale, ignore the timer event.
    388 	if seq != pd.seq {
    389 		// The descriptor was reused or timers were reset.
    390 		unlock(&pd.lock)
    391 		return
    392 	}
    393 	var rg *g
    394 	if read {
    395 		if pd.rd <= 0 || pd.rt.f == nil {
    396 			throw("netpolldeadlineimpl: inconsistent read deadline")
    397 		}
    398 		pd.rd = -1
    399 		atomicstorep(unsafe.Pointer(&pd.rt.f), nil) // full memory barrier between store to rd and load of rg in netpollunblock
    400 		rg = netpollunblock(pd, 'r', false)
    401 	}
    402 	var wg *g
    403 	if write {
    404 		if pd.wd <= 0 || pd.wt.f == nil && !read {
    405 			throw("netpolldeadlineimpl: inconsistent write deadline")
    406 		}
    407 		pd.wd = -1
    408 		atomicstorep(unsafe.Pointer(&pd.wt.f), nil) // full memory barrier between store to wd and load of wg in netpollunblock
    409 		wg = netpollunblock(pd, 'w', false)
    410 	}
    411 	unlock(&pd.lock)
    412 	if rg != nil {
    413 		goready(rg, 0)
    414 	}
    415 	if wg != nil {
    416 		goready(wg, 0)
    417 	}
    418 }
    419 
    420 func netpollDeadline(arg interface{}, seq uintptr) {
    421 	netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
    422 }
    423 
    424 func netpollReadDeadline(arg interface{}, seq uintptr) {
    425 	netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
    426 }
    427 
    428 func netpollWriteDeadline(arg interface{}, seq uintptr) {
    429 	netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
    430 }
    431 
    432 func (c *pollCache) alloc() *pollDesc {
    433 	lock(&c.lock)
    434 	if c.first == nil {
    435 		const pdSize = unsafe.Sizeof(pollDesc{})
    436 		n := pollBlockSize / pdSize
    437 		if n == 0 {
    438 			n = 1
    439 		}
    440 		// Must be in non-GC memory because can be referenced
    441 		// only from epoll/kqueue internals.
    442 		mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
    443 		for i := uintptr(0); i < n; i++ {
    444 			pd := (*pollDesc)(add(mem, i*pdSize))
    445 			pd.link = c.first
    446 			c.first = pd
    447 		}
    448 	}
    449 	pd := c.first
    450 	c.first = pd.link
    451 	unlock(&c.lock)
    452 	return pd
    453 }
    454