1 package interactors 2 3 import ( 4 "sync" 5 6 "github.com/pkg/errors" 7 ) 8 9 type taskRunner struct { 10 errorChan chan error 11 sync.Mutex 12 sync.WaitGroup 13 } 14 15 func NewTaskRunner() *taskRunner { 16 return &taskRunner{} 17 } 18 19 func (t *taskRunner) ExecuteFunctionsAsync(functions []func() error) error { 20 t.Lock() 21 defer t.Unlock() 22 t.errorChan = make(chan error) 23 t.spawnTasksAsync( 24 t.syncErrorFnToAsync(functions), 25 ) 26 go t.closeErrorChanOnComplete() 27 return t.breakOnError() 28 } 29 30 func (t *taskRunner) breakOnError() error { 31 return <-t.errorChan 32 } 33 34 func (t *taskRunner) syncErrorFnToAsync(functions []func() error) []func() { 35 transformed := make([]func(), len(functions)) 36 for i, fn := range functions { 37 transformed[i] = t.redirectErrToChannel(fn) 38 } 39 return transformed 40 } 41 42 func (t *taskRunner) closeErrorChanOnComplete() { 43 t.Wait() 44 close(t.errorChan) 45 } 46 47 func (t *taskRunner) spawnTasksAsync(tasks []func()) { 48 t.Add(len(tasks)) 49 for _, task := range tasks { 50 go task() 51 } 52 } 53 54 func (t *taskRunner) redirectErrToChannel(f func() error) func() { 55 return func() { 56 defer t.Done() 57 err := f() 58 if err != nil && t.errorChan != nil { 59 t.errorChan <- errors.Wrap(err, "Error redirected to channel") 60 } 61 } 62 } 63