Remove unnecessary reader barrier on --filter mode

This commit is contained in:
Junegunn Choi 2019-11-11 12:53:03 +09:00
parent e975bd0c8d
commit 73c0a645e0
No known key found for this signature in database
GPG Key ID: 254BC280FEF9C627
3 changed files with 14 additions and 11 deletions

View File

@ -139,7 +139,7 @@ func Run(opts *Options, revision string) {
if !streamingFilter { if !streamingFilter {
reader = NewReader(func(data []byte) bool { reader = NewReader(func(data []byte) bool {
return chunkList.Push(data) return chunkList.Push(data)
}, eventBox, opts.ReadZero) }, eventBox, opts.ReadZero, opts.Filter == nil)
go reader.ReadSource() go reader.ReadSource()
} }
@ -183,7 +183,7 @@ func Run(opts *Options, revision string) {
} }
} }
return false return false
}, eventBox, opts.ReadZero) }, eventBox, opts.ReadZero, false)
reader.ReadSource() reader.ReadSource()
} else { } else {
eventBox.Unwatch(EvtReadNew) eventBox.Unwatch(EvtReadNew)

View File

@ -23,11 +23,12 @@ type Reader struct {
exec *exec.Cmd exec *exec.Cmd
command *string command *string
killed bool killed bool
wait bool
} }
// NewReader returns new Reader object // NewReader returns new Reader object
func NewReader(pusher func([]byte) bool, eventBox *util.EventBox, delimNil bool) *Reader { func NewReader(pusher func([]byte) bool, eventBox *util.EventBox, delimNil bool, wait bool) *Reader {
return &Reader{pusher, eventBox, delimNil, int32(EvtReady), make(chan bool, 1), sync.Mutex{}, nil, nil, false} return &Reader{pusher, eventBox, delimNil, int32(EvtReady), make(chan bool, 1), sync.Mutex{}, nil, nil, false, wait}
} }
func (r *Reader) startEventPoller() { func (r *Reader) startEventPoller() {
@ -39,7 +40,9 @@ func (r *Reader) startEventPoller() {
r.eventBox.Set(EvtReadNew, (*string)(nil)) r.eventBox.Set(EvtReadNew, (*string)(nil))
pollInterval = readerPollIntervalMin pollInterval = readerPollIntervalMin
} else if atomic.LoadInt32(ptr) == int32(EvtReadFin) { } else if atomic.LoadInt32(ptr) == int32(EvtReadFin) {
r.finChan <- true if r.wait {
r.finChan <- true
}
return return
} else { } else {
pollInterval += readerPollIntervalStep pollInterval += readerPollIntervalStep
@ -54,7 +57,9 @@ func (r *Reader) startEventPoller() {
func (r *Reader) fin(success bool) { func (r *Reader) fin(success bool) {
atomic.StoreInt32(&r.event, int32(EvtReadFin)) atomic.StoreInt32(&r.event, int32(EvtReadFin))
<-r.finChan if r.wait {
<-r.finChan
}
r.mutex.Lock() r.mutex.Lock()
ret := r.command ret := r.command

View File

@ -10,11 +10,9 @@ import (
func TestReadFromCommand(t *testing.T) { func TestReadFromCommand(t *testing.T) {
strs := []string{} strs := []string{}
eb := util.NewEventBox() eb := util.NewEventBox()
reader := Reader{ reader := NewReader(
pusher: func(s []byte) bool { strs = append(strs, string(s)); return true }, func(s []byte) bool { strs = append(strs, string(s)); return true },
finChan: make(chan bool, 1), eb, false, true)
eventBox: eb,
event: int32(EvtReady)}
reader.startEventPoller() reader.startEventPoller()