diff --git a/src/constants.go b/src/constants.go index 10df1e7..0f6a32c 100644 --- a/src/constants.go +++ b/src/constants.go @@ -16,7 +16,10 @@ const ( coordinatorDelayStep time.Duration = 10 * time.Millisecond // Reader - readerBufferSize = 64 * 1024 + readerBufferSize = 64 * 1024 + readerPollIntervalMin = 10 * time.Millisecond + readerPollIntervalStep = 5 * time.Millisecond + readerPollIntervalMax = 50 * time.Millisecond // Terminal initialDelay = 20 * time.Millisecond @@ -68,7 +71,7 @@ const ( EvtSearchProgress EvtSearchFin EvtHeader - EvtClose + EvtReady ) const ( diff --git a/src/core.go b/src/core.go index 968d407..61f14f9 100644 --- a/src/core.go +++ b/src/core.go @@ -115,9 +115,9 @@ func Run(opts *Options, revision string) { // Reader streamingFilter := opts.Filter != nil && !sort && !opts.Tac && !opts.Sync if !streamingFilter { - reader := Reader{func(data []byte) bool { + reader := NewReader(func(data []byte) bool { return chunkList.Push(data) - }, eventBox, opts.ReadZero} + }, eventBox, opts.ReadZero) go reader.ReadSource() } @@ -150,7 +150,7 @@ func Run(opts *Options, revision string) { found := false if streamingFilter { slab := util.MakeSlab(slab16Size, slab32Size) - reader := Reader{ + reader := NewReader( func(runes []byte) bool { item := Item{} if chunkList.trans(&item, runes, 0) { @@ -160,7 +160,7 @@ func Run(opts *Options, revision string) { } } return false - }, eventBox, opts.ReadZero} + }, eventBox, opts.ReadZero) reader.ReadSource() } else { eventBox.Unwatch(EvtReadNew) diff --git a/src/reader.go b/src/reader.go index 1572e5d..401b8f0 100644 --- a/src/reader.go +++ b/src/reader.go @@ -4,6 +4,8 @@ import ( "bufio" "io" "os" + "sync/atomic" + "time" "github.com/junegunn/fzf/src/util" ) @@ -13,10 +15,43 @@ type Reader struct { pusher func([]byte) bool eventBox *util.EventBox delimNil bool + event int32 +} + +// NewReader returns new Reader object +func NewReader(pusher func([]byte) bool, eventBox *util.EventBox, delimNil bool) *Reader { + return &Reader{pusher, eventBox, delimNil, int32(EvtReady)} +} + +func (r *Reader) startEventPoller() { + go func() { + ptr := &r.event + pollInterval := readerPollIntervalMin + for { + if atomic.CompareAndSwapInt32(ptr, int32(EvtReadNew), int32(EvtReady)) { + r.eventBox.Set(EvtReadNew, true) + pollInterval = readerPollIntervalMin + } else if atomic.LoadInt32(ptr) == int32(EvtReadFin) { + return + } else { + pollInterval += readerPollIntervalStep + if pollInterval > readerPollIntervalMax { + pollInterval = readerPollIntervalMax + } + } + time.Sleep(pollInterval) + } + }() +} + +func (r *Reader) fin(success bool) { + atomic.StoreInt32(&r.event, int32(EvtReadFin)) + r.eventBox.Set(EvtReadFin, success) } // ReadSource reads data from the default command or from standard input func (r *Reader) ReadSource() { + r.startEventPoller() var success bool if util.IsTty() { cmd := os.Getenv("FZF_DEFAULT_COMMAND") @@ -27,7 +62,7 @@ func (r *Reader) ReadSource() { } else { success = r.readFromStdin() } - r.eventBox.Set(EvtReadFin, success) + r.fin(success) } func (r *Reader) feed(src io.Reader) { @@ -51,7 +86,7 @@ func (r *Reader) feed(src io.Reader) { } } if r.pusher(bytea) { - r.eventBox.Set(EvtReadNew, true) + atomic.StoreInt32(&r.event, int32(EvtReadNew)) } } if err != nil { diff --git a/src/reader_test.go b/src/reader_test.go index d5c218c..82ca6b7 100644 --- a/src/reader_test.go +++ b/src/reader_test.go @@ -2,6 +2,7 @@ package fzf import ( "testing" + "time" "github.com/junegunn/fzf/src/util" ) @@ -11,7 +12,10 @@ func TestReadFromCommand(t *testing.T) { eb := util.NewEventBox() reader := Reader{ pusher: func(s []byte) bool { strs = append(strs, string(s)); return true }, - eventBox: eb} + eventBox: eb, + event: int32(EvtReady)} + + reader.startEventPoller() // Check EventBox if eb.Peek(EvtReadNew) { @@ -19,21 +23,16 @@ func TestReadFromCommand(t *testing.T) { } // Normal command - reader.readFromCommand(`echo abc && echo def`) + reader.fin(reader.readFromCommand(`echo abc && echo def`)) if len(strs) != 2 || strs[0] != "abc" || strs[1] != "def" { t.Errorf("%s", strs) } // Check EventBox again - if !eb.Peek(EvtReadNew) { - t.Error("EvtReadNew should be set yet") - } + eb.WaitFor(EvtReadFin) // Wait should return immediately eb.Wait(func(events *util.Events) { - if _, found := (*events)[EvtReadNew]; !found { - t.Errorf("%s", events) - } events.Clear() }) @@ -42,8 +41,14 @@ func TestReadFromCommand(t *testing.T) { t.Error("EvtReadNew should not be set yet") } + // Make sure that event poller is finished + time.Sleep(readerPollIntervalMax) + + // Restart event poller + reader.startEventPoller() + // Failing command - reader.readFromCommand(`no-such-command`) + reader.fin(reader.readFromCommand(`no-such-command`)) strs = []string{} if len(strs) > 0 { t.Errorf("%s", strs) @@ -51,6 +56,9 @@ func TestReadFromCommand(t *testing.T) { // Check EventBox again if eb.Peek(EvtReadNew) { - t.Error("Command failed. EvtReadNew should be set") + t.Error("Command failed. EvtReadNew should not be set") + } + if !eb.Peek(EvtReadFin) { + t.Error("EvtReadFin should be set") } }