Make Reader event notification asynchronous

Instead of notifying the event coordinator (EventBox) whenever a new
line is arrived, start a background goroutine that periodically does the
task. Atomic.StoreInt32 is much cheaper than mutex synchronization
that happens during EventBox update.
This commit is contained in:
Junegunn Choi 2017-08-16 03:24:23 +09:00
parent 0d171ba1d8
commit 487c8fe88f
No known key found for this signature in database
GPG Key ID: 254BC280FEF9C627
4 changed files with 64 additions and 18 deletions

View File

@ -16,7 +16,10 @@ const (
coordinatorDelayStep time.Duration = 10 * time.Millisecond
// Reader
readerBufferSize = 64 * 1024
readerBufferSize = 64 * 1024
readerPollIntervalMin = 10 * time.Millisecond
readerPollIntervalStep = 5 * time.Millisecond
readerPollIntervalMax = 50 * time.Millisecond
// Terminal
initialDelay = 20 * time.Millisecond
@ -68,7 +71,7 @@ const (
EvtSearchProgress
EvtSearchFin
EvtHeader
EvtClose
EvtReady
)
const (

View File

@ -115,9 +115,9 @@ func Run(opts *Options, revision string) {
// Reader
streamingFilter := opts.Filter != nil && !sort && !opts.Tac && !opts.Sync
if !streamingFilter {
reader := Reader{func(data []byte) bool {
reader := NewReader(func(data []byte) bool {
return chunkList.Push(data)
}, eventBox, opts.ReadZero}
}, eventBox, opts.ReadZero)
go reader.ReadSource()
}
@ -150,7 +150,7 @@ func Run(opts *Options, revision string) {
found := false
if streamingFilter {
slab := util.MakeSlab(slab16Size, slab32Size)
reader := Reader{
reader := NewReader(
func(runes []byte) bool {
item := Item{}
if chunkList.trans(&item, runes, 0) {
@ -160,7 +160,7 @@ func Run(opts *Options, revision string) {
}
}
return false
}, eventBox, opts.ReadZero}
}, eventBox, opts.ReadZero)
reader.ReadSource()
} else {
eventBox.Unwatch(EvtReadNew)

View File

@ -4,6 +4,8 @@ import (
"bufio"
"io"
"os"
"sync/atomic"
"time"
"github.com/junegunn/fzf/src/util"
)
@ -13,10 +15,43 @@ type Reader struct {
pusher func([]byte) bool
eventBox *util.EventBox
delimNil bool
event int32
}
// NewReader returns new Reader object
func NewReader(pusher func([]byte) bool, eventBox *util.EventBox, delimNil bool) *Reader {
return &Reader{pusher, eventBox, delimNil, int32(EvtReady)}
}
func (r *Reader) startEventPoller() {
go func() {
ptr := &r.event
pollInterval := readerPollIntervalMin
for {
if atomic.CompareAndSwapInt32(ptr, int32(EvtReadNew), int32(EvtReady)) {
r.eventBox.Set(EvtReadNew, true)
pollInterval = readerPollIntervalMin
} else if atomic.LoadInt32(ptr) == int32(EvtReadFin) {
return
} else {
pollInterval += readerPollIntervalStep
if pollInterval > readerPollIntervalMax {
pollInterval = readerPollIntervalMax
}
}
time.Sleep(pollInterval)
}
}()
}
func (r *Reader) fin(success bool) {
atomic.StoreInt32(&r.event, int32(EvtReadFin))
r.eventBox.Set(EvtReadFin, success)
}
// ReadSource reads data from the default command or from standard input
func (r *Reader) ReadSource() {
r.startEventPoller()
var success bool
if util.IsTty() {
cmd := os.Getenv("FZF_DEFAULT_COMMAND")
@ -27,7 +62,7 @@ func (r *Reader) ReadSource() {
} else {
success = r.readFromStdin()
}
r.eventBox.Set(EvtReadFin, success)
r.fin(success)
}
func (r *Reader) feed(src io.Reader) {
@ -51,7 +86,7 @@ func (r *Reader) feed(src io.Reader) {
}
}
if r.pusher(bytea) {
r.eventBox.Set(EvtReadNew, true)
atomic.StoreInt32(&r.event, int32(EvtReadNew))
}
}
if err != nil {

View File

@ -2,6 +2,7 @@ package fzf
import (
"testing"
"time"
"github.com/junegunn/fzf/src/util"
)
@ -11,7 +12,10 @@ func TestReadFromCommand(t *testing.T) {
eb := util.NewEventBox()
reader := Reader{
pusher: func(s []byte) bool { strs = append(strs, string(s)); return true },
eventBox: eb}
eventBox: eb,
event: int32(EvtReady)}
reader.startEventPoller()
// Check EventBox
if eb.Peek(EvtReadNew) {
@ -19,21 +23,16 @@ func TestReadFromCommand(t *testing.T) {
}
// Normal command
reader.readFromCommand(`echo abc && echo def`)
reader.fin(reader.readFromCommand(`echo abc && echo def`))
if len(strs) != 2 || strs[0] != "abc" || strs[1] != "def" {
t.Errorf("%s", strs)
}
// Check EventBox again
if !eb.Peek(EvtReadNew) {
t.Error("EvtReadNew should be set yet")
}
eb.WaitFor(EvtReadFin)
// Wait should return immediately
eb.Wait(func(events *util.Events) {
if _, found := (*events)[EvtReadNew]; !found {
t.Errorf("%s", events)
}
events.Clear()
})
@ -42,8 +41,14 @@ func TestReadFromCommand(t *testing.T) {
t.Error("EvtReadNew should not be set yet")
}
// Make sure that event poller is finished
time.Sleep(readerPollIntervalMax)
// Restart event poller
reader.startEventPoller()
// Failing command
reader.readFromCommand(`no-such-command`)
reader.fin(reader.readFromCommand(`no-such-command`))
strs = []string{}
if len(strs) > 0 {
t.Errorf("%s", strs)
@ -51,6 +56,9 @@ func TestReadFromCommand(t *testing.T) {
// Check EventBox again
if eb.Peek(EvtReadNew) {
t.Error("Command failed. EvtReadNew should be set")
t.Error("Command failed. EvtReadNew should not be set")
}
if !eb.Peek(EvtReadFin) {
t.Error("EvtReadFin should be set")
}
}