Home | History | Annotate | Download | only in kati
      1 // Copyright 2015 Google Inc. All rights reserved
      2 //
      3 // Licensed under the Apache License, Version 2.0 (the "License");
      4 // you may not use this file except in compliance with the License.
      5 // You may obtain a copy of the License at
      6 //
      7 //      http://www.apache.org/licenses/LICENSE-2.0
      8 //
      9 // Unless required by applicable law or agreed to in writing, software
     10 // distributed under the License is distributed on an "AS IS" BASIS,
     11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 // See the License for the specific language governing permissions and
     13 // limitations under the License.
     14 
     15 package kati
     16 
     17 import (
     18 	"container/heap"
     19 	"errors"
     20 	"fmt"
     21 	"os"
     22 	"os/exec"
     23 	"syscall"
     24 	"time"
     25 
     26 	"github.com/golang/glog"
     27 )
     28 
     29 var (
     30 	errNothingDone = errors.New("nothing done")
     31 )
     32 
     33 type job struct {
     34 	n        *DepNode
     35 	ex       *Executor
     36 	parents  []*job
     37 	outputTs int64
     38 	numDeps  int
     39 	depsTs   int64
     40 	id       int
     41 
     42 	runners []runner
     43 }
     44 
     45 type jobResult struct {
     46 	j   *job
     47 	w   *worker
     48 	err error
     49 }
     50 
     51 type newDep struct {
     52 	j        *job
     53 	neededBy *job
     54 }
     55 
     56 type worker struct {
     57 	wm       *workerManager
     58 	jobChan  chan *job
     59 	waitChan chan bool
     60 	doneChan chan bool
     61 }
     62 
     63 type jobQueue []*job
     64 
     65 func (jq jobQueue) Len() int      { return len(jq) }
     66 func (jq jobQueue) Swap(i, j int) { jq[i], jq[j] = jq[j], jq[i] }
     67 
     68 func (jq jobQueue) Less(i, j int) bool {
     69 	// First come, first serve, for GNU make compatibility.
     70 	return jq[i].id < jq[j].id
     71 }
     72 
     73 func (jq *jobQueue) Push(x interface{}) {
     74 	item := x.(*job)
     75 	*jq = append(*jq, item)
     76 }
     77 
     78 func (jq *jobQueue) Pop() interface{} {
     79 	old := *jq
     80 	n := len(old)
     81 	item := old[n-1]
     82 	*jq = old[0 : n-1]
     83 	return item
     84 }
     85 
     86 func newWorker(wm *workerManager) *worker {
     87 	w := &worker{
     88 		wm:       wm,
     89 		jobChan:  make(chan *job),
     90 		waitChan: make(chan bool),
     91 		doneChan: make(chan bool),
     92 	}
     93 	return w
     94 }
     95 
     96 func (w *worker) Run() {
     97 	done := false
     98 	for !done {
     99 		select {
    100 		case j := <-w.jobChan:
    101 			err := j.build()
    102 			w.wm.ReportResult(w, j, err)
    103 		case done = <-w.waitChan:
    104 		}
    105 	}
    106 	w.doneChan <- true
    107 }
    108 
    109 func (w *worker) PostJob(j *job) {
    110 	w.jobChan <- j
    111 }
    112 
    113 func (w *worker) Wait() {
    114 	w.waitChan <- true
    115 	<-w.doneChan
    116 }
    117 
    118 func (j *job) createRunners() ([]runner, error) {
    119 	runners, _, err := createRunners(j.ex.ctx, j.n)
    120 	return runners, err
    121 }
    122 
    123 // TODO(ukai): use time.Time?
    124 func getTimestamp(filename string) int64 {
    125 	st, err := os.Stat(filename)
    126 	if err != nil {
    127 		return -2
    128 	}
    129 	return st.ModTime().Unix()
    130 }
    131 
    132 func (j *job) build() error {
    133 	if j.n.IsPhony {
    134 		j.outputTs = -2 // trigger cmd even if all inputs don't exist.
    135 	} else {
    136 		j.outputTs = getTimestamp(j.n.Output)
    137 	}
    138 
    139 	if !j.n.HasRule {
    140 		if j.outputTs >= 0 || j.n.IsPhony {
    141 			return errNothingDone
    142 		}
    143 		if len(j.parents) == 0 {
    144 			return fmt.Errorf("*** No rule to make target %q.", j.n.Output)
    145 		}
    146 		return fmt.Errorf("*** No rule to make target %q, needed by %q.", j.n.Output, j.parents[0].n.Output)
    147 	}
    148 
    149 	if j.outputTs >= j.depsTs {
    150 		// TODO: stats.
    151 		return errNothingDone
    152 	}
    153 
    154 	rr, err := j.createRunners()
    155 	if err != nil {
    156 		return err
    157 	}
    158 	if len(rr) == 0 {
    159 		return errNothingDone
    160 	}
    161 	for _, r := range rr {
    162 		err := r.run(j.n.Output)
    163 		glog.Warningf("cmd result for %q: %v", j.n.Output, err)
    164 		if err != nil {
    165 			exit := exitStatus(err)
    166 			return fmt.Errorf("*** [%s] Error %d", j.n.Output, exit)
    167 		}
    168 	}
    169 
    170 	if j.n.IsPhony {
    171 		j.outputTs = time.Now().Unix()
    172 	} else {
    173 		j.outputTs = getTimestamp(j.n.Output)
    174 		if j.outputTs < 0 {
    175 			j.outputTs = time.Now().Unix()
    176 		}
    177 	}
    178 	return nil
    179 }
    180 
    181 func (wm *workerManager) handleJobs() error {
    182 	for {
    183 		if len(wm.freeWorkers) == 0 {
    184 			return nil
    185 		}
    186 		if wm.readyQueue.Len() == 0 {
    187 			return nil
    188 		}
    189 		j := heap.Pop(&wm.readyQueue).(*job)
    190 		glog.V(1).Infof("run: %s", j.n.Output)
    191 
    192 		j.numDeps = -1 // Do not let other workers pick this.
    193 		w := wm.freeWorkers[0]
    194 		wm.freeWorkers = wm.freeWorkers[1:]
    195 		wm.busyWorkers[w] = true
    196 		w.jobChan <- j
    197 	}
    198 }
    199 
    200 func (wm *workerManager) updateParents(j *job) {
    201 	for _, p := range j.parents {
    202 		p.numDeps--
    203 		glog.V(1).Infof("child: %s (%d)", p.n.Output, p.numDeps)
    204 		if p.depsTs < j.outputTs {
    205 			p.depsTs = j.outputTs
    206 		}
    207 		wm.maybePushToReadyQueue(p)
    208 	}
    209 }
    210 
    211 type workerManager struct {
    212 	maxJobs     int
    213 	jobs        []*job
    214 	readyQueue  jobQueue
    215 	jobChan     chan *job
    216 	resultChan  chan jobResult
    217 	newDepChan  chan newDep
    218 	stopChan    chan bool
    219 	waitChan    chan bool
    220 	doneChan    chan error
    221 	freeWorkers []*worker
    222 	busyWorkers map[*worker]bool
    223 	ex          *Executor
    224 	runnings    map[string]*job
    225 
    226 	finishCnt int
    227 	skipCnt   int
    228 }
    229 
    230 func newWorkerManager(numJobs int) (*workerManager, error) {
    231 	wm := &workerManager{
    232 		maxJobs:     numJobs,
    233 		jobChan:     make(chan *job),
    234 		resultChan:  make(chan jobResult),
    235 		newDepChan:  make(chan newDep),
    236 		stopChan:    make(chan bool),
    237 		waitChan:    make(chan bool),
    238 		doneChan:    make(chan error),
    239 		busyWorkers: make(map[*worker]bool),
    240 	}
    241 
    242 	wm.busyWorkers = make(map[*worker]bool)
    243 	for i := 0; i < numJobs; i++ {
    244 		w := newWorker(wm)
    245 		wm.freeWorkers = append(wm.freeWorkers, w)
    246 		go w.Run()
    247 	}
    248 	heap.Init(&wm.readyQueue)
    249 	go wm.Run()
    250 	return wm, nil
    251 }
    252 
    253 func exitStatus(err error) int {
    254 	if err == nil {
    255 		return 0
    256 	}
    257 	exit := 1
    258 	if err, ok := err.(*exec.ExitError); ok {
    259 		if w, ok := err.ProcessState.Sys().(syscall.WaitStatus); ok {
    260 			return w.ExitStatus()
    261 		}
    262 	}
    263 	return exit
    264 }
    265 
    266 func (wm *workerManager) hasTodo() bool {
    267 	return wm.finishCnt != len(wm.jobs)
    268 }
    269 
    270 func (wm *workerManager) maybePushToReadyQueue(j *job) {
    271 	if j.numDeps != 0 {
    272 		return
    273 	}
    274 	heap.Push(&wm.readyQueue, j)
    275 	glog.V(1).Infof("ready: %s", j.n.Output)
    276 }
    277 
    278 func (wm *workerManager) handleNewDep(j *job, neededBy *job) {
    279 	if j.numDeps < 0 {
    280 		neededBy.numDeps--
    281 		if neededBy.id > 0 {
    282 			panic("FIXME: already in WM... can this happen?")
    283 		}
    284 	} else {
    285 		j.parents = append(j.parents, neededBy)
    286 	}
    287 }
    288 
    289 func (wm *workerManager) Run() {
    290 	done := false
    291 	var err error
    292 Loop:
    293 	for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done {
    294 		select {
    295 		case j := <-wm.jobChan:
    296 			glog.V(1).Infof("wait: %s (%d)", j.n.Output, j.numDeps)
    297 			j.id = len(wm.jobs) + 1
    298 			wm.jobs = append(wm.jobs, j)
    299 			wm.maybePushToReadyQueue(j)
    300 		case jr := <-wm.resultChan:
    301 			glog.V(1).Infof("done: %s", jr.j.n.Output)
    302 			delete(wm.busyWorkers, jr.w)
    303 			wm.freeWorkers = append(wm.freeWorkers, jr.w)
    304 			wm.updateParents(jr.j)
    305 			wm.finishCnt++
    306 			if jr.err == errNothingDone {
    307 				wm.skipCnt++
    308 				jr.err = nil
    309 			}
    310 			if jr.err != nil {
    311 				err = jr.err
    312 				close(wm.stopChan)
    313 				break Loop
    314 			}
    315 		case af := <-wm.newDepChan:
    316 			wm.handleNewDep(af.j, af.neededBy)
    317 			glog.V(1).Infof("dep: %s (%d) %s", af.neededBy.n.Output, af.neededBy.numDeps, af.j.n.Output)
    318 		case done = <-wm.waitChan:
    319 		}
    320 		err = wm.handleJobs()
    321 		if err != nil {
    322 			break Loop
    323 		}
    324 
    325 		glog.V(1).Infof("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers))
    326 	}
    327 	if !done {
    328 		<-wm.waitChan
    329 	}
    330 
    331 	for _, w := range wm.freeWorkers {
    332 		w.Wait()
    333 	}
    334 	for w := range wm.busyWorkers {
    335 		w.Wait()
    336 	}
    337 	wm.doneChan <- err
    338 }
    339 
    340 func (wm *workerManager) PostJob(j *job) error {
    341 	select {
    342 	case wm.jobChan <- j:
    343 		return nil
    344 	case <-wm.stopChan:
    345 		return errors.New("worker manager stopped")
    346 	}
    347 }
    348 
    349 func (wm *workerManager) ReportResult(w *worker, j *job, err error) {
    350 	select {
    351 	case wm.resultChan <- jobResult{w: w, j: j, err: err}:
    352 	case <-wm.stopChan:
    353 	}
    354 }
    355 
    356 func (wm *workerManager) ReportNewDep(j *job, neededBy *job) {
    357 	select {
    358 	case wm.newDepChan <- newDep{j: j, neededBy: neededBy}:
    359 	case <-wm.stopChan:
    360 	}
    361 }
    362 
    363 func (wm *workerManager) Wait() (int, error) {
    364 	wm.waitChan <- true
    365 	err := <-wm.doneChan
    366 	glog.V(2).Infof("finish %d skip %d", wm.finishCnt, wm.skipCnt)
    367 	return wm.finishCnt - wm.skipCnt, err
    368 }
    369