Avoid buffering the entire file list during walks

This commit is contained in:
Jakob Borg 2014-07-15 14:27:46 +02:00
parent 8b349945de
commit 32a5e83612
3 changed files with 62 additions and 24 deletions

View File

@ -290,6 +290,7 @@ func main() {
rateBucket = ratelimit.NewBucketWithRate(float64(1000*cfg.Options.MaxSendKbps), int64(5*1000*cfg.Options.MaxSendKbps)) rateBucket = ratelimit.NewBucketWithRate(float64(1000*cfg.Options.MaxSendKbps), int64(5*1000*cfg.Options.MaxSendKbps))
} }
removeLegacyIndexes()
db, err := leveldb.OpenFile(filepath.Join(confDir, "index"), nil) db, err := leveldb.OpenFile(filepath.Join(confDir, "index"), nil)
if err != nil { if err != nil {
l.Fatalln("leveldb.OpenFile():", err) l.Fatalln("leveldb.OpenFile():", err)
@ -519,7 +520,12 @@ func resetRepositories() {
} }
} }
pat := filepath.Join(confDir, "*.idx.gz") idx := filepath.Join(confDir, "index")
os.RemoveAll(idx)
}
func removeLegacyIndexes() {
pat := filepath.Join(confDir, "*.idx.gz*")
idxs, err := filepath.Glob(pat) idxs, err := filepath.Glob(pat)
if err == nil { if err == nil {
for _, idx := range idxs { for _, idx := range idxs {

View File

@ -701,8 +701,11 @@ func (m *Model) CleanRepos() {
func (m *Model) ScanRepo(repo string) error { func (m *Model) ScanRepo(repo string) error {
m.rmut.RLock() m.rmut.RLock()
fs := m.repoFiles[repo]
dir := m.repoCfgs[repo].Directory
w := &scanner.Walker{ w := &scanner.Walker{
Dir: m.repoCfgs[repo].Directory, Dir: dir,
IgnoreFile: ".stignore", IgnoreFile: ".stignore",
BlockSize: scanner.StandardBlockSize, BlockSize: scanner.StandardBlockSize,
TempNamer: defTempNamer, TempNamer: defTempNamer,
@ -711,12 +714,47 @@ func (m *Model) ScanRepo(repo string) error {
IgnorePerms: m.repoCfgs[repo].IgnorePerms, IgnorePerms: m.repoCfgs[repo].IgnorePerms,
} }
m.rmut.RUnlock() m.rmut.RUnlock()
m.setState(repo, RepoScanning) m.setState(repo, RepoScanning)
fs, _, err := w.Walk() fchan, _, err := w.Walk()
if err != nil { if err != nil {
return err return err
} }
m.ReplaceLocal(repo, fs) batch := make([]protocol.FileInfo, 0, indexBatchSize)
for f := range fchan {
if len(batch) == indexBatchSize {
fs.Update(protocol.LocalNodeID, batch)
batch = batch[:0]
}
batch = append(batch, f)
}
if len(batch) > 0 {
fs.Update(protocol.LocalNodeID, batch)
}
batch = batch[:0]
fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
if !protocol.IsDeleted(f.Flags) {
if len(batch) == indexBatchSize {
fs.Update(protocol.LocalNodeID, batch)
batch = batch[:0]
}
if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
// File has been deleted
f.Blocks = nil
f.Flags |= protocol.FlagDeleted
f.Version = lamport.Default.Tick(f.Version)
f.LocalVersion = 0
batch = append(batch, f)
}
}
return true
})
if len(batch) > 0 {
fs.Update(protocol.LocalNodeID, batch)
}
m.setState(repo, RepoIdle) m.setState(repo, RepoIdle)
return nil return nil
} }
@ -790,6 +828,7 @@ func (m *Model) Override(repo string) {
*f = h *f = h
} }
f.Version = lamport.Default.Tick(f.Version) f.Version = lamport.Default.Tick(f.Version)
f.LocalVersion = 0
} }
r.Update(protocol.LocalNodeID, fs) r.Update(protocol.LocalNodeID, fs)

View File

@ -6,7 +6,6 @@ package scanner
import ( import (
"bytes" "bytes"
"code.google.com/p/go.text/unicode/norm"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -15,6 +14,7 @@ import (
"runtime" "runtime"
"strings" "strings"
"time" "time"
"code.google.com/p/go.text/unicode/norm"
"github.com/calmh/syncthing/lamport" "github.com/calmh/syncthing/lamport"
"github.com/calmh/syncthing/protocol" "github.com/calmh/syncthing/protocol"
@ -60,7 +60,7 @@ type CurrentFiler interface {
// Walk returns the list of files found in the local repository by scanning the // Walk returns the list of files found in the local repository by scanning the
// file system. Files are blockwise hashed. // file system. Files are blockwise hashed.
func (w *Walker) Walk() (files []protocol.FileInfo, ignore map[string][]string, err error) { func (w *Walker) Walk() (files chan protocol.FileInfo, ignore map[string][]string, err error) {
if debug { if debug {
l.Debugln("Walk", w.Dir, w.BlockSize, w.IgnoreFile) l.Debugln("Walk", w.Dir, w.BlockSize, w.IgnoreFile)
} }
@ -70,21 +70,16 @@ func (w *Walker) Walk() (files []protocol.FileInfo, ignore map[string][]string,
return return
} }
t0 := time.Now()
ignore = make(map[string][]string) ignore = make(map[string][]string)
hashFiles := w.walkAndHashFiles(&files, ignore) files = make(chan protocol.FileInfo)
hashFiles := w.walkAndHashFiles(files, ignore)
go func() {
filepath.Walk(w.Dir, w.loadIgnoreFiles(w.Dir, ignore)) filepath.Walk(w.Dir, w.loadIgnoreFiles(w.Dir, ignore))
filepath.Walk(w.Dir, hashFiles) filepath.Walk(w.Dir, hashFiles)
close(files)
}()
if debug {
t1 := time.Now()
d := t1.Sub(t0).Seconds()
l.Debugf("Walk in %.02f ms, %.0f files/s", d*1000, float64(len(files))/d)
}
err = checkDir(w.Dir)
return return
} }
@ -122,7 +117,7 @@ func (w *Walker) loadIgnoreFiles(dir string, ign map[string][]string) filepath.W
} }
} }
func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]string) filepath.WalkFunc { func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo, ign map[string][]string) filepath.WalkFunc {
return func(p string, info os.FileInfo, err error) error { return func(p string, info os.FileInfo, err error) error {
if err != nil { if err != nil {
if debug { if debug {
@ -175,7 +170,6 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str
if debug { if debug {
l.Debugln("unchanged:", cf) l.Debugln("unchanged:", cf)
} }
*res = append(*res, cf)
return nil return nil
} }
} }
@ -195,7 +189,7 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str
if debug { if debug {
l.Debugln("dir:", f) l.Debugln("dir:", f)
} }
*res = append(*res, f) fchan <- f
return nil return nil
} }
@ -207,7 +201,6 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str
if debug { if debug {
l.Debugln("unchanged:", cf) l.Debugln("unchanged:", cf)
} }
*res = append(*res, cf)
return nil return nil
} }
@ -220,7 +213,7 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str
if debug { if debug {
l.Debugln("suppressed:", cf) l.Debugln("suppressed:", cf)
} }
*res = append(*res, cf) fchan <- cf
return nil return nil
} else if prev && !cur { } else if prev && !cur {
l.Infof("Changes to %q are no longer suppressed.", p) l.Infof("Changes to %q are no longer suppressed.", p)
@ -265,7 +258,7 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str
Modified: info.ModTime().Unix(), Modified: info.ModTime().Unix(),
Blocks: blocks, Blocks: blocks,
} }
*res = append(*res, f) fchan <- f
} }
return nil return nil