diff --git a/src/core.go b/src/core.go index e0e0271..a452722 100644 --- a/src/core.go +++ b/src/core.go @@ -299,7 +299,7 @@ func Run(opts *Options) (int, error) { itemIndex = 0 inputRevision.bumpMajor() header = make([]string, 0, opts.HeaderLines) - go reader.restart(command, environ) + reader.restart(command, environ) } exitCode := ExitOk diff --git a/src/reader.go b/src/reader.go index 572c9c6..50e7809 100644 --- a/src/reader.go +++ b/src/reader.go @@ -25,16 +25,24 @@ type Reader struct { event int32 finChan chan bool mutex sync.Mutex - exec *exec.Cmd - execOut io.ReadCloser + termFunc func() command *string - killed bool wait bool } // NewReader returns new Reader object func NewReader(pusher func([]byte) bool, eventBox *util.EventBox, executor *util.Executor, delimNil bool, wait bool) *Reader { - return &Reader{pusher, executor, eventBox, delimNil, int32(EvtReady), make(chan bool, 1), sync.Mutex{}, nil, nil, nil, false, wait} + return &Reader{ + pusher, + executor, + eventBox, + delimNil, + int32(EvtReady), + make(chan bool, 1), + sync.Mutex{}, + func() { os.Stdin.Close() }, + nil, + wait} } func (r *Reader) startEventPoller() { @@ -61,6 +69,10 @@ func (r *Reader) startEventPoller() { }() } +func (r *Reader) wasKilled() bool { + return r.termFunc == nil +} + func (r *Reader) fin(success bool) { atomic.StoreInt32(&r.event, int32(EvtReadFin)) if r.wait { @@ -69,7 +81,7 @@ func (r *Reader) fin(success bool) { r.mutex.Lock() ret := r.command - if success || r.killed { + if success || r.wasKilled() { ret = nil } r.mutex.Unlock() @@ -79,12 +91,9 @@ func (r *Reader) fin(success bool) { func (r *Reader) terminate() { r.mutex.Lock() - r.killed = true - if r.exec != nil && r.exec.Process != nil { - r.execOut.Close() - util.KillCommand(r.exec) - } else { - os.Stdin.Close() + if r.termFunc != nil { + r.termFunc() + r.termFunc = nil } r.mutex.Unlock() } @@ -92,9 +101,17 @@ func (r *Reader) terminate() { func (r *Reader) restart(command commandSpec, environ []string) { r.event = int32(EvtReady) r.startEventPoller() - success := r.readFromCommand(command.command, environ) - r.fin(success) - removeFiles(command.tempFiles) + + r.mutex.Lock() + defer r.mutex.Unlock() + + if exec, execOut := r.startCommand(command.command, environ); exec != nil { + go func() { + success := r.feedCommandOutput(exec, execOut) + r.fin(success) + removeFiles(command.tempFiles) + }() + } } func (r *Reader) readChannel(inputChan chan string) bool { @@ -249,7 +266,6 @@ func trimPath(path string) string { } func (r *Reader) readFiles(root string, opts walkerOpts, ignores []string) bool { - r.killed = false conf := fastwalk.Config{ Follow: opts.follow, // Use forward slashes when running a Windows binary under WSL or MSYS @@ -280,7 +296,7 @@ func (r *Reader) readFiles(root string, opts walkerOpts, ignores []string) bool } r.mutex.Lock() defer r.mutex.Unlock() - if r.killed { + if r.wasKilled() { return context.Canceled } return nil @@ -288,31 +304,39 @@ func (r *Reader) readFiles(root string, opts walkerOpts, ignores []string) bool return fastwalk.Walk(&conf, root, fn) == nil } +// Should be called with the mutex held +func (r *Reader) startCommand(command string, environ []string) (*exec.Cmd, io.ReadCloser) { + r.termFunc = nil + r.command = &command + exec := r.executor.ExecCommand(command, true) + if environ != nil { + exec.Env = environ + } + execOut, err := exec.StdoutPipe() + if err != nil || exec.Start() != nil { + return nil, nil + } + + // Function to call to terminate the running command + r.termFunc = func() { + execOut.Close() + util.KillCommand(exec) + } + + return exec, execOut +} + +func (r *Reader) feedCommandOutput(exec *exec.Cmd, execOut io.ReadCloser) bool { + r.feed(execOut) + return exec.Wait() == nil +} + func (r *Reader) readFromCommand(command string, environ []string) bool { r.mutex.Lock() - r.killed = false - r.command = &command - r.exec = r.executor.ExecCommand(command, true) - if environ != nil { - r.exec.Env = environ - } - - var err error - r.execOut, err = r.exec.StdoutPipe() - if err != nil { - r.exec = nil - r.mutex.Unlock() - return false - } - - err = r.exec.Start() - if err != nil { - r.exec = nil - r.mutex.Unlock() - return false - } - + exec, execOut := r.startCommand(command, environ) r.mutex.Unlock() - r.feed(r.execOut) - return r.exec.Wait() == nil + if exec == nil { + return false + } + return r.feedCommandOutput(exec, execOut) }