From 4cb4a3ac7f9e5d743442c5bfa9163c8cae54d57e Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Fri, 5 Feb 2016 22:23:34 +0100 Subject: [PATCH] Add separate goroutine that closes the output chan This allows iterating over the output channel without having to start another Goroutine outside of the worker pool. This also removes the need for calling Wait(). --- src/restic/worker/pool.go | 45 +++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/src/restic/worker/pool.go b/src/restic/worker/pool.go index d2afb3257..d2331f587 100644 --- a/src/restic/worker/pool.go +++ b/src/restic/worker/pool.go @@ -1,10 +1,5 @@ package worker -import ( - "fmt" - "sync" -) - // Job is one unit of work. It is given to a Func, and the returned result and // error are stored in Result and Error. type Job struct { @@ -20,33 +15,53 @@ type Func func(job Job, done <-chan struct{}) (result interface{}, err error) type Pool struct { f Func done chan struct{} - wg *sync.WaitGroup jobCh <-chan Job resCh chan<- Job + + numWorkers int + workersExit chan struct{} + allWorkersDone chan struct{} } // New returns a new worker pool with n goroutines, each running the function // f. The workers are started immediately. func New(n int, f Func, jobChan <-chan Job, resultChan chan<- Job) *Pool { p := &Pool{ - f: f, - done: make(chan struct{}), - wg: &sync.WaitGroup{}, - jobCh: jobChan, - resCh: resultChan, + f: f, + done: make(chan struct{}), + workersExit: make(chan struct{}), + allWorkersDone: make(chan struct{}), + numWorkers: n, + jobCh: jobChan, + resCh: resultChan, } for i := 0; i < n; i++ { - p.wg.Add(1) go p.runWorker(i) } + go p.waitForExit() + return p } +// waitForExit receives from p.workersExit until all worker functions have +// exited, then closes the result channel. +func (p *Pool) waitForExit() { + n := p.numWorkers + for n > 0 { + <-p.workersExit + n-- + } + close(p.allWorkersDone) + close(p.resCh) +} + // runWorker runs a worker function. func (p *Pool) runWorker(numWorker int) { - defer p.wg.Done() + defer func() { + p.workersExit <- struct{}{} + }() var ( // enable the input channel when starting up a new goroutine @@ -65,7 +80,6 @@ func (p *Pool) runWorker(numWorker int) { case job, ok = <-inCh: if !ok { - fmt.Printf("in channel closed, worker exiting\n") return } @@ -88,6 +102,5 @@ func (p *Pool) Cancel() { // Wait waits for all worker goroutines to terminate, afterwards the output // channel is closed. func (p *Pool) Wait() { - p.wg.Wait() - close(p.resCh) + <-p.allWorkersDone }