diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dc315c..d5f66a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ CHANGELOG 0.49.0 ------ -- Ingestion performance improved by around 20% +- Ingestion performance improved by around 40% - Added two environment variables exported to the child processes - `FZF_PREVIEW_LABEL` - `FZF_BORDER_LABEL` diff --git a/src/reader.go b/src/reader.go index 3ebe352..85a988b 100644 --- a/src/reader.go +++ b/src/reader.go @@ -1,13 +1,12 @@ package fzf import ( - "bufio" + "bytes" "context" "io" "os" "os/exec" "path/filepath" - "strconv" "sync" "sync/atomic" "time" @@ -112,86 +111,82 @@ func (r *Reader) ReadSource(root string, opts walkerOpts, ignores []string) { } func (r *Reader) feed(src io.Reader) { - readerSlabSize, ae := strconv.Atoi(os.Getenv("SLAB_KB")) - if ae != nil { - readerSlabSize = 128 * 1024 - } else { - readerSlabSize *= 1024 - } - readerBufferSize, be := strconv.Atoi(os.Getenv("BUF_KB")) - if be != nil { - readerBufferSize = 64 * 1024 - } else { - readerBufferSize *= 1024 - } + /* + readerSlabSize, ae := strconv.Atoi(os.Getenv("SLAB_KB")) + if ae != nil { + readerSlabSize = 128 * 1024 + } else { + readerSlabSize *= 1024 + } + readerBufferSize, be := strconv.Atoi(os.Getenv("BUF_KB")) + if be != nil { + readerBufferSize = 64 * 1024 + } else { + readerBufferSize *= 1024 + } + */ - slab := make([]byte, readerSlabSize) - pointer := 0 delim := byte('\n') if r.delimNil { delim = '\000' } - reader := bufio.NewReaderSize(src, readerBufferSize) - - // We do not put a slice longer than 10% of the slab to reduce fragmentation - maxBytes := readerBufferSize / 10 + slab := make([]byte, readerSlabSize) + leftover := []byte{} + var err error for { - var frags [][]byte - fragsLen := 0 - for { - bytea, err := reader.ReadSlice(delim) - if err == bufio.ErrBufferFull { - // Could not find the delimiter in the reader buffer. - // Need to collect the fragments and merge them later. - frags = append(frags, bytea) - fragsLen += len(bytea) - } else { - byteaLen := len(bytea) - if err == nil { - // No errors. Found the delimiter. - if util.IsWindows() && byteaLen >= 2 && bytea[byteaLen-2] == byte('\r') { - bytea = bytea[:byteaLen-2] - byteaLen -= 2 - } else { - bytea = bytea[:byteaLen-1] - byteaLen-- - } - } - - itemLen := fragsLen + byteaLen - pointer += itemLen - var slice []byte - if itemLen <= maxBytes { // We can use the slab - // Allocate a new slab if it doesn't fit - if pointer > readerSlabSize { - slab = make([]byte, readerSlabSize) - pointer = itemLen - } - slice = slab[pointer-itemLen : pointer] - } else { // We can't use the slab because the item is too large - slice = make([]byte, itemLen) - } - - if len(frags) > 0 { - // Collect the fragments - n := 0 - for _, frag := range frags { - n += copy(slice[n:], frag) - } - copy(slice[n:], bytea) - } else if byteaLen > 0 { - copy(slice, bytea) - } - if (err == nil || itemLen > 0) && r.pusher(slice) { - atomic.StoreInt32(&r.event, int32(EvtReadNew)) - } - if err != nil { - return - } + n := 0 + scope := slab[:util.Min(len(slab), readerBufferSize)] + for i := 0; i < 100; i++ { + n, err = src.Read(scope) + if n > 0 || err != nil { break } } + + // We're not making any progress after 100 tries. Stop. + if n == 0 && err == nil { + break + } + + buf := slab[:n] + slab = slab[n:] + + for len(buf) > 0 { + if i := bytes.IndexByte(buf, delim); i >= 0 { + // Found the delimiter + slice := buf[:i+1] + buf = buf[i+1:] + if util.IsWindows() && len(slice) >= 2 && slice[len(slice)-2] == byte('\r') { + slice = slice[:len(slice)-2] + } else { + slice = slice[:len(slice)-1] + } + if len(leftover) > 0 { + slice = append(leftover, slice...) + leftover = []byte{} + } + if (err == nil || len(slice) > 0) && r.pusher(slice) { + atomic.StoreInt32(&r.event, int32(EvtReadNew)) + } + } else { + // Could not find the delimiter in the buffer + leftover = append(leftover, buf...) + break + } + } + + if err == io.EOF { + leftover = append(leftover, buf...) + break + } + + if len(slab) == 0 { + slab = make([]byte, readerSlabSize) + } + } + if len(leftover) > 0 && r.pusher(leftover) { + atomic.StoreInt32(&r.event, int32(EvtReadNew)) } }