Lines Matching refs:wm
57 wm *workerManager
86 func newWorker(wm *workerManager) *worker {
88 wm: wm,
102 w.wm.ReportResult(w, j, err)
181 func (wm *workerManager) handleJobs() error {
183 if len(wm.freeWorkers) == 0 {
186 if wm.readyQueue.Len() == 0 {
189 j := heap.Pop(&wm.readyQueue).(*job)
193 w := wm.freeWorkers[0]
194 wm.freeWorkers = wm.freeWorkers[1:]
195 wm.busyWorkers[w] = true
200 func (wm *workerManager) updateParents(j *job) {
207 wm.maybePushToReadyQueue(p)
231 wm := &workerManager{
242 wm.busyWorkers = make(map[*worker]bool)
244 w := newWorker(wm)
245 wm.freeWorkers = append(wm.freeWorkers, w)
248 heap.Init(&wm.readyQueue)
249 go wm.Run()
250 return wm, nil
266 func (wm *workerManager) hasTodo() bool {
267 return wm.finishCnt != len(wm.jobs)
270 func (wm *workerManager) maybePushToReadyQueue(j *job) {
274 heap.Push(&wm.readyQueue, j)
278 func (wm *workerManager) handleNewDep(j *job, neededBy *job) {
282 panic("FIXME: already in WM... can this happen?")
289 func (wm *workerManager) Run() {
293 for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done {
295 case j := <-wm.jobChan:
297 j.id = len(wm.jobs) + 1
298 wm.jobs = append(wm.jobs, j)
299 wm.maybePushToReadyQueue(j)
300 case jr := <-wm.resultChan:
302 delete(wm.busyWorkers, jr.w)
303 wm.freeWorkers = append(wm.freeWorkers, jr.w)
304 wm.updateParents(jr.j)
305 wm.finishCnt++
307 wm.skipCnt++
312 close(wm.stopChan)
315 case af := <-wm.newDepChan:
316 wm.handleNewDep(af.j, af.neededBy)
318 case done = <-wm.waitChan:
320 err = wm.handleJobs()
325 glog.V(1).Infof("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers))
328 <-wm.waitChan
331 for _, w := range wm.freeWorkers {
334 for w := range wm.busyWorkers {
337 wm.doneChan <- err
340 func (wm *workerManager) PostJob(j *job) error {
342 case wm.jobChan <- j:
344 case <-wm.stopChan:
349 func (wm *workerManager) ReportResult(w *worker, j *job, err error) {
351 case wm.resultChan <- jobResult{w: w, j: j, err: err}:
352 case <-wm.stopChan:
356 func (wm *workerManager) ReportNewDep(j *job, neededBy *job) {
358 case wm.newDepChan <- newDep{j: j, neededBy: neededBy}:
359 case <-wm.stopChan:
363 func (wm *workerManager) Wait() (int, error) {
364 wm.waitChan <- true
365 err := <-wm.doneChan
366 glog.V(2).Infof("finish %d skip %d", wm.finishCnt, wm.skipCnt)
367 return wm.finishCnt - wm.skipCnt, err