Home | History | Annotate | Download | only in runtime
      1 // Copyright 2012 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 // Parallel for algorithm.
      6 
      7 package runtime
      8 
      9 // A parfor holds state for the parallel for operation.
     10 type parfor struct {
     11 	body   func(*parfor, uint32) // executed for each element
     12 	done   uint32                // number of idle threads
     13 	nthr   uint32                // total number of threads
     14 	thrseq uint32                // thread id sequencer
     15 	cnt    uint32                // iteration space [0, cnt)
     16 	wait   bool                  // if true, wait while all threads finish processing,
     17 	// otherwise parfor may return while other threads are still working
     18 
     19 	thr []parforthread // thread descriptors
     20 
     21 	// stats
     22 	nsteal     uint64
     23 	nstealcnt  uint64
     24 	nprocyield uint64
     25 	nosyield   uint64
     26 	nsleep     uint64
     27 }
     28 
     29 // A parforthread holds state for a single thread in the parallel for.
     30 type parforthread struct {
     31 	// the thread's iteration space [32lsb, 32msb)
     32 	pos uint64
     33 	// stats
     34 	nsteal     uint64
     35 	nstealcnt  uint64
     36 	nprocyield uint64
     37 	nosyield   uint64
     38 	nsleep     uint64
     39 	pad        [_CacheLineSize]byte
     40 }
     41 
     42 func parforalloc(nthrmax uint32) *parfor {
     43 	return &parfor{
     44 		thr: make([]parforthread, nthrmax),
     45 	}
     46 }
     47 
     48 // Parforsetup initializes desc for a parallel for operation with nthr
     49 // threads executing n jobs.
     50 //
     51 // On return the nthr threads are each expected to call parfordo(desc)
     52 // to run the operation. During those calls, for each i in [0, n), one
     53 // thread will be used invoke body(desc, i).
     54 // If wait is true, no parfordo will return until all work has been completed.
     55 // If wait is false, parfordo may return when there is a small amount
     56 // of work left, under the assumption that another thread has that
     57 // work well in hand.
     58 func parforsetup(desc *parfor, nthr, n uint32, wait bool, body func(*parfor, uint32)) {
     59 	if desc == nil || nthr == 0 || nthr > uint32(len(desc.thr)) || body == nil {
     60 		print("desc=", desc, " nthr=", nthr, " count=", n, " body=", body, "\n")
     61 		throw("parfor: invalid args")
     62 	}
     63 
     64 	desc.body = body
     65 	desc.done = 0
     66 	desc.nthr = nthr
     67 	desc.thrseq = 0
     68 	desc.cnt = n
     69 	desc.wait = wait
     70 	desc.nsteal = 0
     71 	desc.nstealcnt = 0
     72 	desc.nprocyield = 0
     73 	desc.nosyield = 0
     74 	desc.nsleep = 0
     75 
     76 	for i := range desc.thr {
     77 		begin := uint32(uint64(n) * uint64(i) / uint64(nthr))
     78 		end := uint32(uint64(n) * uint64(i+1) / uint64(nthr))
     79 		desc.thr[i].pos = uint64(begin) | uint64(end)<<32
     80 	}
     81 }
     82 
     83 func parfordo(desc *parfor) {
     84 	// Obtain 0-based thread index.
     85 	tid := xadd(&desc.thrseq, 1) - 1
     86 	if tid >= desc.nthr {
     87 		print("tid=", tid, " nthr=", desc.nthr, "\n")
     88 		throw("parfor: invalid tid")
     89 	}
     90 
     91 	// If single-threaded, just execute the for serially.
     92 	body := desc.body
     93 	if desc.nthr == 1 {
     94 		for i := uint32(0); i < desc.cnt; i++ {
     95 			body(desc, i)
     96 		}
     97 		return
     98 	}
     99 
    100 	me := &desc.thr[tid]
    101 	mypos := &me.pos
    102 	for {
    103 		for {
    104 			// While there is local work,
    105 			// bump low index and execute the iteration.
    106 			pos := xadd64(mypos, 1)
    107 			begin := uint32(pos) - 1
    108 			end := uint32(pos >> 32)
    109 			if begin < end {
    110 				body(desc, begin)
    111 				continue
    112 			}
    113 			break
    114 		}
    115 
    116 		// Out of work, need to steal something.
    117 		idle := false
    118 		for try := uint32(0); ; try++ {
    119 			// If we don't see any work for long enough,
    120 			// increment the done counter...
    121 			if try > desc.nthr*4 && !idle {
    122 				idle = true
    123 				xadd(&desc.done, 1)
    124 			}
    125 
    126 			// ...if all threads have incremented the counter,
    127 			// we are done.
    128 			extra := uint32(0)
    129 			if !idle {
    130 				extra = 1
    131 			}
    132 			if desc.done+extra == desc.nthr {
    133 				if !idle {
    134 					xadd(&desc.done, 1)
    135 				}
    136 				goto exit
    137 			}
    138 
    139 			// Choose a random victim for stealing.
    140 			var begin, end uint32
    141 			victim := fastrand1() % (desc.nthr - 1)
    142 			if victim >= tid {
    143 				victim++
    144 			}
    145 			victimpos := &desc.thr[victim].pos
    146 			for {
    147 				// See if it has any work.
    148 				pos := atomicload64(victimpos)
    149 				begin = uint32(pos)
    150 				end = uint32(pos >> 32)
    151 				if begin+1 >= end {
    152 					end = 0
    153 					begin = end
    154 					break
    155 				}
    156 				if idle {
    157 					xadd(&desc.done, -1)
    158 					idle = false
    159 				}
    160 				begin2 := begin + (end-begin)/2
    161 				newpos := uint64(begin) | uint64(begin2)<<32
    162 				if cas64(victimpos, pos, newpos) {
    163 					begin = begin2
    164 					break
    165 				}
    166 			}
    167 			if begin < end {
    168 				// Has successfully stolen some work.
    169 				if idle {
    170 					throw("parfor: should not be idle")
    171 				}
    172 				atomicstore64(mypos, uint64(begin)|uint64(end)<<32)
    173 				me.nsteal++
    174 				me.nstealcnt += uint64(end) - uint64(begin)
    175 				break
    176 			}
    177 
    178 			// Backoff.
    179 			if try < desc.nthr {
    180 				// nothing
    181 			} else if try < 4*desc.nthr {
    182 				me.nprocyield++
    183 				procyield(20)
    184 			} else if !desc.wait {
    185 				// If a caller asked not to wait for the others, exit now
    186 				// (assume that most work is already done at this point).
    187 				if !idle {
    188 					xadd(&desc.done, 1)
    189 				}
    190 				goto exit
    191 			} else if try < 6*desc.nthr {
    192 				me.nosyield++
    193 				osyield()
    194 			} else {
    195 				me.nsleep++
    196 				usleep(1)
    197 			}
    198 		}
    199 	}
    200 
    201 exit:
    202 	xadd64(&desc.nsteal, int64(me.nsteal))
    203 	xadd64(&desc.nstealcnt, int64(me.nstealcnt))
    204 	xadd64(&desc.nprocyield, int64(me.nprocyield))
    205 	xadd64(&desc.nosyield, int64(me.nosyield))
    206 	xadd64(&desc.nsleep, int64(me.nsleep))
    207 	me.nsteal = 0
    208 	me.nstealcnt = 0
    209 	me.nprocyield = 0
    210 	me.nosyield = 0
    211 	me.nsleep = 0
    212 }
    213