Home | History | Annotate | Download | only in interactors
      1 package interactors
      2 
      3 import (
      4 	"sync"
      5 
      6 	"github.com/pkg/errors"
      7 )
      8 
      9 type taskRunner struct {
     10 	errorChan chan error
     11 	sync.Mutex
     12 	sync.WaitGroup
     13 }
     14 
     15 func NewTaskRunner() *taskRunner {
     16 	return &taskRunner{}
     17 }
     18 
     19 func (t *taskRunner) ExecuteFunctionsAsync(functions []func() error) error {
     20 	t.Lock()
     21 	defer t.Unlock()
     22 	t.errorChan = make(chan error)
     23 	t.spawnTasksAsync(
     24 		t.syncErrorFnToAsync(functions),
     25 	)
     26 	go t.closeErrorChanOnComplete()
     27 	return t.breakOnError()
     28 }
     29 
     30 func (t *taskRunner) breakOnError() error {
     31 	return <-t.errorChan
     32 }
     33 
     34 func (t *taskRunner) syncErrorFnToAsync(functions []func() error) []func() {
     35 	transformed := make([]func(), len(functions))
     36 	for i, fn := range functions {
     37 		transformed[i] = t.redirectErrToChannel(fn)
     38 	}
     39 	return transformed
     40 }
     41 
     42 func (t *taskRunner) closeErrorChanOnComplete() {
     43 	t.Wait()
     44 	close(t.errorChan)
     45 }
     46 
     47 func (t *taskRunner) spawnTasksAsync(tasks []func()) {
     48 	t.Add(len(tasks))
     49 	for _, task := range tasks {
     50 		go task()
     51 	}
     52 }
     53 
     54 func (t *taskRunner) redirectErrToChannel(f func() error) func() {
     55 	return func() {
     56 		defer t.Done()
     57 		err := f()
     58 		if err != nil && t.errorChan != nil {
     59 			t.errorChan <- errors.Wrap(err, "Error redirected to channel")
     60 		}
     61 	}
     62 }
     63