Fix race in reload action

Fix #4070
This commit is contained in:
Junegunn Choi 2024-10-31 19:40:40 +09:00
parent 82ebcd9209
commit dcb4c3d84a
No known key found for this signature in database
GPG Key ID: 254BC280FEF9C627
2 changed files with 65 additions and 41 deletions

View File

@ -299,7 +299,7 @@ func Run(opts *Options) (int, error) {
itemIndex = 0 itemIndex = 0
inputRevision.bumpMajor() inputRevision.bumpMajor()
header = make([]string, 0, opts.HeaderLines) header = make([]string, 0, opts.HeaderLines)
go reader.restart(command, environ) reader.restart(command, environ)
} }
exitCode := ExitOk exitCode := ExitOk

View File

@ -25,16 +25,24 @@ type Reader struct {
event int32 event int32
finChan chan bool finChan chan bool
mutex sync.Mutex mutex sync.Mutex
exec *exec.Cmd termFunc func()
execOut io.ReadCloser
command *string command *string
killed bool
wait bool wait bool
} }
// NewReader returns new Reader object // NewReader returns new Reader object
func NewReader(pusher func([]byte) bool, eventBox *util.EventBox, executor *util.Executor, delimNil bool, wait bool) *Reader { func NewReader(pusher func([]byte) bool, eventBox *util.EventBox, executor *util.Executor, delimNil bool, wait bool) *Reader {
return &Reader{pusher, executor, eventBox, delimNil, int32(EvtReady), make(chan bool, 1), sync.Mutex{}, nil, nil, nil, false, wait} return &Reader{
pusher,
executor,
eventBox,
delimNil,
int32(EvtReady),
make(chan bool, 1),
sync.Mutex{},
func() { os.Stdin.Close() },
nil,
wait}
} }
func (r *Reader) startEventPoller() { func (r *Reader) startEventPoller() {
@ -61,6 +69,10 @@ func (r *Reader) startEventPoller() {
}() }()
} }
func (r *Reader) wasKilled() bool {
return r.termFunc == nil
}
func (r *Reader) fin(success bool) { func (r *Reader) fin(success bool) {
atomic.StoreInt32(&r.event, int32(EvtReadFin)) atomic.StoreInt32(&r.event, int32(EvtReadFin))
if r.wait { if r.wait {
@ -69,7 +81,7 @@ func (r *Reader) fin(success bool) {
r.mutex.Lock() r.mutex.Lock()
ret := r.command ret := r.command
if success || r.killed { if success || r.wasKilled() {
ret = nil ret = nil
} }
r.mutex.Unlock() r.mutex.Unlock()
@ -79,12 +91,9 @@ func (r *Reader) fin(success bool) {
func (r *Reader) terminate() { func (r *Reader) terminate() {
r.mutex.Lock() r.mutex.Lock()
r.killed = true if r.termFunc != nil {
if r.exec != nil && r.exec.Process != nil { r.termFunc()
r.execOut.Close() r.termFunc = nil
util.KillCommand(r.exec)
} else {
os.Stdin.Close()
} }
r.mutex.Unlock() r.mutex.Unlock()
} }
@ -92,9 +101,17 @@ func (r *Reader) terminate() {
func (r *Reader) restart(command commandSpec, environ []string) { func (r *Reader) restart(command commandSpec, environ []string) {
r.event = int32(EvtReady) r.event = int32(EvtReady)
r.startEventPoller() r.startEventPoller()
success := r.readFromCommand(command.command, environ)
r.fin(success) r.mutex.Lock()
removeFiles(command.tempFiles) defer r.mutex.Unlock()
if exec, execOut := r.startCommand(command.command, environ); exec != nil {
go func() {
success := r.feedCommandOutput(exec, execOut)
r.fin(success)
removeFiles(command.tempFiles)
}()
}
} }
func (r *Reader) readChannel(inputChan chan string) bool { func (r *Reader) readChannel(inputChan chan string) bool {
@ -249,7 +266,6 @@ func trimPath(path string) string {
} }
func (r *Reader) readFiles(root string, opts walkerOpts, ignores []string) bool { func (r *Reader) readFiles(root string, opts walkerOpts, ignores []string) bool {
r.killed = false
conf := fastwalk.Config{ conf := fastwalk.Config{
Follow: opts.follow, Follow: opts.follow,
// Use forward slashes when running a Windows binary under WSL or MSYS // Use forward slashes when running a Windows binary under WSL or MSYS
@ -280,7 +296,7 @@ func (r *Reader) readFiles(root string, opts walkerOpts, ignores []string) bool
} }
r.mutex.Lock() r.mutex.Lock()
defer r.mutex.Unlock() defer r.mutex.Unlock()
if r.killed { if r.wasKilled() {
return context.Canceled return context.Canceled
} }
return nil return nil
@ -288,31 +304,39 @@ func (r *Reader) readFiles(root string, opts walkerOpts, ignores []string) bool
return fastwalk.Walk(&conf, root, fn) == nil return fastwalk.Walk(&conf, root, fn) == nil
} }
// Should be called with the mutex held
func (r *Reader) startCommand(command string, environ []string) (*exec.Cmd, io.ReadCloser) {
r.termFunc = nil
r.command = &command
exec := r.executor.ExecCommand(command, true)
if environ != nil {
exec.Env = environ
}
execOut, err := exec.StdoutPipe()
if err != nil || exec.Start() != nil {
return nil, nil
}
// Function to call to terminate the running command
r.termFunc = func() {
execOut.Close()
util.KillCommand(exec)
}
return exec, execOut
}
func (r *Reader) feedCommandOutput(exec *exec.Cmd, execOut io.ReadCloser) bool {
r.feed(execOut)
return exec.Wait() == nil
}
func (r *Reader) readFromCommand(command string, environ []string) bool { func (r *Reader) readFromCommand(command string, environ []string) bool {
r.mutex.Lock() r.mutex.Lock()
r.killed = false exec, execOut := r.startCommand(command, environ)
r.command = &command
r.exec = r.executor.ExecCommand(command, true)
if environ != nil {
r.exec.Env = environ
}
var err error
r.execOut, err = r.exec.StdoutPipe()
if err != nil {
r.exec = nil
r.mutex.Unlock()
return false
}
err = r.exec.Start()
if err != nil {
r.exec = nil
r.mutex.Unlock()
return false
}
r.mutex.Unlock() r.mutex.Unlock()
r.feed(r.execOut) if exec == nil {
return r.exec.Wait() == nil return false
}
return r.feedCommandOutput(exec, execOut)
} }