Home | History | Annotate | Download | only in runtime
      1 // Copyright 2013 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 // +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
      6 
      7 package runtime
      8 
      9 import "unsafe"
     10 
     11 // Integrated network poller (platform-independent part).
     12 // A particular implementation (epoll/kqueue) must define the following functions:
     13 // func netpollinit()			// to initialize the poller
     14 // func netpollopen(fd uintptr, pd *pollDesc) int32	// to arm edge-triggered notifications
     15 // and associate fd with pd.
     16 // An implementation must call the following function to denote that the pd is ready.
     17 // func netpollready(gpp **g, pd *pollDesc, mode int32)
     18 
     19 // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
     20 // goroutines respectively. The semaphore can be in the following states:
     21 // pdReady - io readiness notification is pending;
     22 //           a goroutine consumes the notification by changing the state to nil.
     23 // pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
     24 //          the goroutine commits to park by changing the state to G pointer,
     25 //          or, alternatively, concurrent io notification changes the state to READY,
     26 //          or, alternatively, concurrent timeout/close changes the state to nil.
     27 // G pointer - the goroutine is blocked on the semaphore;
     28 //             io notification or timeout/close changes the state to READY or nil respectively
     29 //             and unparks the goroutine.
     30 // nil - nothing of the above.
     31 const (
     32 	pdReady uintptr = 1
     33 	pdWait  uintptr = 2
     34 )
     35 
     36 const pollBlockSize = 4 * 1024
     37 
     38 // Network poller descriptor.
     39 type pollDesc struct {
     40 	link *pollDesc // in pollcache, protected by pollcache.lock
     41 
     42 	// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
     43 	// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
     44 	// pollReset, pollWait, pollWaitCanceled and runtimenetpollready (IO readiness notification)
     45 	// proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
     46 	// in a lock-free way by all operations.
     47 	// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
     48 	// that will blow up when GC starts moving objects.
     49 	lock    mutex // protects the following fields
     50 	fd      uintptr
     51 	closing bool
     52 	seq     uintptr // protects from stale timers and ready notifications
     53 	rg      uintptr // pdReady, pdWait, G waiting for read or nil
     54 	rt      timer   // read deadline timer (set if rt.f != nil)
     55 	rd      int64   // read deadline
     56 	wg      uintptr // pdReady, pdWait, G waiting for write or nil
     57 	wt      timer   // write deadline timer
     58 	wd      int64   // write deadline
     59 	user    uint32  // user settable cookie
     60 }
     61 
     62 type pollCache struct {
     63 	lock  mutex
     64 	first *pollDesc
     65 	// PollDesc objects must be type-stable,
     66 	// because we can get ready notification from epoll/kqueue
     67 	// after the descriptor is closed/reused.
     68 	// Stale notifications are detected using seq variable,
     69 	// seq is incremented when deadlines are changed or descriptor is reused.
     70 }
     71 
     72 var (
     73 	netpollInited uint32
     74 	pollcache     pollCache
     75 )
     76 
     77 //go:linkname net_runtime_pollServerInit net.runtime_pollServerInit
     78 func net_runtime_pollServerInit() {
     79 	netpollinit()
     80 	atomicstore(&netpollInited, 1)
     81 }
     82 
     83 func netpollinited() bool {
     84 	return atomicload(&netpollInited) != 0
     85 }
     86 
     87 //go:linkname net_runtime_pollOpen net.runtime_pollOpen
     88 func net_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
     89 	pd := pollcache.alloc()
     90 	lock(&pd.lock)
     91 	if pd.wg != 0 && pd.wg != pdReady {
     92 		throw("netpollOpen: blocked write on free descriptor")
     93 	}
     94 	if pd.rg != 0 && pd.rg != pdReady {
     95 		throw("netpollOpen: blocked read on free descriptor")
     96 	}
     97 	pd.fd = fd
     98 	pd.closing = false
     99 	pd.seq++
    100 	pd.rg = 0
    101 	pd.rd = 0
    102 	pd.wg = 0
    103 	pd.wd = 0
    104 	unlock(&pd.lock)
    105 
    106 	var errno int32
    107 	errno = netpollopen(fd, pd)
    108 	return pd, int(errno)
    109 }
    110 
    111 //go:linkname net_runtime_pollClose net.runtime_pollClose
    112 func net_runtime_pollClose(pd *pollDesc) {
    113 	if !pd.closing {
    114 		throw("netpollClose: close w/o unblock")
    115 	}
    116 	if pd.wg != 0 && pd.wg != pdReady {
    117 		throw("netpollClose: blocked write on closing descriptor")
    118 	}
    119 	if pd.rg != 0 && pd.rg != pdReady {
    120 		throw("netpollClose: blocked read on closing descriptor")
    121 	}
    122 	netpollclose(uintptr(pd.fd))
    123 	pollcache.free(pd)
    124 }
    125 
    126 func (c *pollCache) free(pd *pollDesc) {
    127 	lock(&c.lock)
    128 	pd.link = c.first
    129 	c.first = pd
    130 	unlock(&c.lock)
    131 }
    132 
    133 //go:linkname net_runtime_pollReset net.runtime_pollReset
    134 func net_runtime_pollReset(pd *pollDesc, mode int) int {
    135 	err := netpollcheckerr(pd, int32(mode))
    136 	if err != 0 {
    137 		return err
    138 	}
    139 	if mode == 'r' {
    140 		pd.rg = 0
    141 	} else if mode == 'w' {
    142 		pd.wg = 0
    143 	}
    144 	return 0
    145 }
    146 
    147 //go:linkname net_runtime_pollWait net.runtime_pollWait
    148 func net_runtime_pollWait(pd *pollDesc, mode int) int {
    149 	err := netpollcheckerr(pd, int32(mode))
    150 	if err != 0 {
    151 		return err
    152 	}
    153 	// As for now only Solaris uses level-triggered IO.
    154 	if GOOS == "solaris" {
    155 		netpollarm(pd, mode)
    156 	}
    157 	for !netpollblock(pd, int32(mode), false) {
    158 		err = netpollcheckerr(pd, int32(mode))
    159 		if err != 0 {
    160 			return err
    161 		}
    162 		// Can happen if timeout has fired and unblocked us,
    163 		// but before we had a chance to run, timeout has been reset.
    164 		// Pretend it has not happened and retry.
    165 	}
    166 	return 0
    167 }
    168 
    169 //go:linkname net_runtime_pollWaitCanceled net.runtime_pollWaitCanceled
    170 func net_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
    171 	// This function is used only on windows after a failed attempt to cancel
    172 	// a pending async IO operation. Wait for ioready, ignore closing or timeouts.
    173 	for !netpollblock(pd, int32(mode), true) {
    174 	}
    175 }
    176 
    177 //go:linkname net_runtime_pollSetDeadline net.runtime_pollSetDeadline
    178 func net_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
    179 	lock(&pd.lock)
    180 	if pd.closing {
    181 		unlock(&pd.lock)
    182 		return
    183 	}
    184 	pd.seq++ // invalidate current timers
    185 	// Reset current timers.
    186 	if pd.rt.f != nil {
    187 		deltimer(&pd.rt)
    188 		pd.rt.f = nil
    189 	}
    190 	if pd.wt.f != nil {
    191 		deltimer(&pd.wt)
    192 		pd.wt.f = nil
    193 	}
    194 	// Setup new timers.
    195 	if d != 0 && d <= nanotime() {
    196 		d = -1
    197 	}
    198 	if mode == 'r' || mode == 'r'+'w' {
    199 		pd.rd = d
    200 	}
    201 	if mode == 'w' || mode == 'r'+'w' {
    202 		pd.wd = d
    203 	}
    204 	if pd.rd > 0 && pd.rd == pd.wd {
    205 		pd.rt.f = netpollDeadline
    206 		pd.rt.when = pd.rd
    207 		// Copy current seq into the timer arg.
    208 		// Timer func will check the seq against current descriptor seq,
    209 		// if they differ the descriptor was reused or timers were reset.
    210 		pd.rt.arg = pd
    211 		pd.rt.seq = pd.seq
    212 		addtimer(&pd.rt)
    213 	} else {
    214 		if pd.rd > 0 {
    215 			pd.rt.f = netpollReadDeadline
    216 			pd.rt.when = pd.rd
    217 			pd.rt.arg = pd
    218 			pd.rt.seq = pd.seq
    219 			addtimer(&pd.rt)
    220 		}
    221 		if pd.wd > 0 {
    222 			pd.wt.f = netpollWriteDeadline
    223 			pd.wt.when = pd.wd
    224 			pd.wt.arg = pd
    225 			pd.wt.seq = pd.seq
    226 			addtimer(&pd.wt)
    227 		}
    228 	}
    229 	// If we set the new deadline in the past, unblock currently pending IO if any.
    230 	var rg, wg *g
    231 	atomicstorep(unsafe.Pointer(&wg), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
    232 	if pd.rd < 0 {
    233 		rg = netpollunblock(pd, 'r', false)
    234 	}
    235 	if pd.wd < 0 {
    236 		wg = netpollunblock(pd, 'w', false)
    237 	}
    238 	unlock(&pd.lock)
    239 	if rg != nil {
    240 		goready(rg, 3)
    241 	}
    242 	if wg != nil {
    243 		goready(wg, 3)
    244 	}
    245 }
    246 
    247 //go:linkname net_runtime_pollUnblock net.runtime_pollUnblock
    248 func net_runtime_pollUnblock(pd *pollDesc) {
    249 	lock(&pd.lock)
    250 	if pd.closing {
    251 		throw("netpollUnblock: already closing")
    252 	}
    253 	pd.closing = true
    254 	pd.seq++
    255 	var rg, wg *g
    256 	atomicstorep(unsafe.Pointer(&rg), nil) // full memory barrier between store to closing and read of rg/wg in netpollunblock
    257 	rg = netpollunblock(pd, 'r', false)
    258 	wg = netpollunblock(pd, 'w', false)
    259 	if pd.rt.f != nil {
    260 		deltimer(&pd.rt)
    261 		pd.rt.f = nil
    262 	}
    263 	if pd.wt.f != nil {
    264 		deltimer(&pd.wt)
    265 		pd.wt.f = nil
    266 	}
    267 	unlock(&pd.lock)
    268 	if rg != nil {
    269 		goready(rg, 3)
    270 	}
    271 	if wg != nil {
    272 		goready(wg, 3)
    273 	}
    274 }
    275 
    276 // make pd ready, newly runnable goroutines (if any) are returned in rg/wg
    277 // May run during STW, so write barriers are not allowed.
    278 //go:nowritebarrier
    279 func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
    280 	var rg, wg guintptr
    281 	if mode == 'r' || mode == 'r'+'w' {
    282 		rg.set(netpollunblock(pd, 'r', true))
    283 	}
    284 	if mode == 'w' || mode == 'r'+'w' {
    285 		wg.set(netpollunblock(pd, 'w', true))
    286 	}
    287 	if rg != 0 {
    288 		rg.ptr().schedlink = *gpp
    289 		*gpp = rg
    290 	}
    291 	if wg != 0 {
    292 		wg.ptr().schedlink = *gpp
    293 		*gpp = wg
    294 	}
    295 }
    296 
    297 func netpollcheckerr(pd *pollDesc, mode int32) int {
    298 	if pd.closing {
    299 		return 1 // errClosing
    300 	}
    301 	if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
    302 		return 2 // errTimeout
    303 	}
    304 	return 0
    305 }
    306 
    307 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    308 	return casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    309 }
    310 
    311 // returns true if IO is ready, or false if timedout or closed
    312 // waitio - wait only for completed IO, ignore errors
    313 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    314 	gpp := &pd.rg
    315 	if mode == 'w' {
    316 		gpp = &pd.wg
    317 	}
    318 
    319 	// set the gpp semaphore to WAIT
    320 	for {
    321 		old := *gpp
    322 		if old == pdReady {
    323 			*gpp = 0
    324 			return true
    325 		}
    326 		if old != 0 {
    327 			throw("netpollblock: double wait")
    328 		}
    329 		if casuintptr(gpp, 0, pdWait) {
    330 			break
    331 		}
    332 	}
    333 
    334 	// need to recheck error states after setting gpp to WAIT
    335 	// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    336 	// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
    337 	if waitio || netpollcheckerr(pd, mode) == 0 {
    338 		gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
    339 	}
    340 	// be careful to not lose concurrent READY notification
    341 	old := xchguintptr(gpp, 0)
    342 	if old > pdWait {
    343 		throw("netpollblock: corrupted state")
    344 	}
    345 	return old == pdReady
    346 }
    347 
    348 func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    349 	gpp := &pd.rg
    350 	if mode == 'w' {
    351 		gpp = &pd.wg
    352 	}
    353 
    354 	for {
    355 		old := *gpp
    356 		if old == pdReady {
    357 			return nil
    358 		}
    359 		if old == 0 && !ioready {
    360 			// Only set READY for ioready. runtime_pollWait
    361 			// will check for timeout/cancel before waiting.
    362 			return nil
    363 		}
    364 		var new uintptr
    365 		if ioready {
    366 			new = pdReady
    367 		}
    368 		if casuintptr(gpp, old, new) {
    369 			if old == pdReady || old == pdWait {
    370 				old = 0
    371 			}
    372 			return (*g)(unsafe.Pointer(old))
    373 		}
    374 	}
    375 }
    376 
    377 func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
    378 	lock(&pd.lock)
    379 	// Seq arg is seq when the timer was set.
    380 	// If it's stale, ignore the timer event.
    381 	if seq != pd.seq {
    382 		// The descriptor was reused or timers were reset.
    383 		unlock(&pd.lock)
    384 		return
    385 	}
    386 	var rg *g
    387 	if read {
    388 		if pd.rd <= 0 || pd.rt.f == nil {
    389 			throw("netpolldeadlineimpl: inconsistent read deadline")
    390 		}
    391 		pd.rd = -1
    392 		atomicstorep(unsafe.Pointer(&pd.rt.f), nil) // full memory barrier between store to rd and load of rg in netpollunblock
    393 		rg = netpollunblock(pd, 'r', false)
    394 	}
    395 	var wg *g
    396 	if write {
    397 		if pd.wd <= 0 || pd.wt.f == nil && !read {
    398 			throw("netpolldeadlineimpl: inconsistent write deadline")
    399 		}
    400 		pd.wd = -1
    401 		atomicstorep(unsafe.Pointer(&pd.wt.f), nil) // full memory barrier between store to wd and load of wg in netpollunblock
    402 		wg = netpollunblock(pd, 'w', false)
    403 	}
    404 	unlock(&pd.lock)
    405 	if rg != nil {
    406 		goready(rg, 0)
    407 	}
    408 	if wg != nil {
    409 		goready(wg, 0)
    410 	}
    411 }
    412 
    413 func netpollDeadline(arg interface{}, seq uintptr) {
    414 	netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
    415 }
    416 
    417 func netpollReadDeadline(arg interface{}, seq uintptr) {
    418 	netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
    419 }
    420 
    421 func netpollWriteDeadline(arg interface{}, seq uintptr) {
    422 	netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
    423 }
    424 
    425 func (c *pollCache) alloc() *pollDesc {
    426 	lock(&c.lock)
    427 	if c.first == nil {
    428 		const pdSize = unsafe.Sizeof(pollDesc{})
    429 		n := pollBlockSize / pdSize
    430 		if n == 0 {
    431 			n = 1
    432 		}
    433 		// Must be in non-GC memory because can be referenced
    434 		// only from epoll/kqueue internals.
    435 		mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
    436 		for i := uintptr(0); i < n; i++ {
    437 			pd := (*pollDesc)(add(mem, i*pdSize))
    438 			pd.link = c.first
    439 			c.first = pd
    440 		}
    441 	}
    442 	pd := c.first
    443 	c.first = pd.link
    444 	unlock(&c.lock)
    445 	return pd
    446 }
    447