diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 8a2f9bbc6..952d25edd 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -290,6 +290,7 @@ func main() { rateBucket = ratelimit.NewBucketWithRate(float64(1000*cfg.Options.MaxSendKbps), int64(5*1000*cfg.Options.MaxSendKbps)) } + removeLegacyIndexes() db, err := leveldb.OpenFile(filepath.Join(confDir, "index"), nil) if err != nil { l.Fatalln("leveldb.OpenFile():", err) @@ -519,7 +520,12 @@ func resetRepositories() { } } - pat := filepath.Join(confDir, "*.idx.gz") + idx := filepath.Join(confDir, "index") + os.RemoveAll(idx) +} + +func removeLegacyIndexes() { + pat := filepath.Join(confDir, "*.idx.gz*") idxs, err := filepath.Glob(pat) if err == nil { for _, idx := range idxs { diff --git a/model/model.go b/model/model.go index 3f075d3ee..28502c46e 100644 --- a/model/model.go +++ b/model/model.go @@ -701,8 +701,11 @@ func (m *Model) CleanRepos() { func (m *Model) ScanRepo(repo string) error { m.rmut.RLock() + fs := m.repoFiles[repo] + dir := m.repoCfgs[repo].Directory + w := &scanner.Walker{ - Dir: m.repoCfgs[repo].Directory, + Dir: dir, IgnoreFile: ".stignore", BlockSize: scanner.StandardBlockSize, TempNamer: defTempNamer, @@ -711,12 +714,47 @@ func (m *Model) ScanRepo(repo string) error { IgnorePerms: m.repoCfgs[repo].IgnorePerms, } m.rmut.RUnlock() + m.setState(repo, RepoScanning) - fs, _, err := w.Walk() + fchan, _, err := w.Walk() + if err != nil { return err } - m.ReplaceLocal(repo, fs) + batch := make([]protocol.FileInfo, 0, indexBatchSize) + for f := range fchan { + if len(batch) == indexBatchSize { + fs.Update(protocol.LocalNodeID, batch) + batch = batch[:0] + } + batch = append(batch, f) + } + if len(batch) > 0 { + fs.Update(protocol.LocalNodeID, batch) + } + + batch = batch[:0] + fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool { + if !protocol.IsDeleted(f.Flags) { + if len(batch) == indexBatchSize { + fs.Update(protocol.LocalNodeID, batch) + batch = batch[:0] + } + if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) { + // File has been deleted + f.Blocks = nil + f.Flags |= protocol.FlagDeleted + f.Version = lamport.Default.Tick(f.Version) + f.LocalVersion = 0 + batch = append(batch, f) + } + } + return true + }) + if len(batch) > 0 { + fs.Update(protocol.LocalNodeID, batch) + } + m.setState(repo, RepoIdle) return nil } @@ -790,6 +828,7 @@ func (m *Model) Override(repo string) { *f = h } f.Version = lamport.Default.Tick(f.Version) + f.LocalVersion = 0 } r.Update(protocol.LocalNodeID, fs) diff --git a/scanner/walk.go b/scanner/walk.go index cc4725395..2d114439d 100644 --- a/scanner/walk.go +++ b/scanner/walk.go @@ -6,7 +6,6 @@ package scanner import ( "bytes" - "code.google.com/p/go.text/unicode/norm" "errors" "fmt" "io/ioutil" @@ -15,6 +14,7 @@ import ( "runtime" "strings" "time" + "code.google.com/p/go.text/unicode/norm" "github.com/calmh/syncthing/lamport" "github.com/calmh/syncthing/protocol" @@ -60,7 +60,7 @@ type CurrentFiler interface { // Walk returns the list of files found in the local repository by scanning the // file system. Files are blockwise hashed. -func (w *Walker) Walk() (files []protocol.FileInfo, ignore map[string][]string, err error) { +func (w *Walker) Walk() (files chan protocol.FileInfo, ignore map[string][]string, err error) { if debug { l.Debugln("Walk", w.Dir, w.BlockSize, w.IgnoreFile) } @@ -70,21 +70,16 @@ func (w *Walker) Walk() (files []protocol.FileInfo, ignore map[string][]string, return } - t0 := time.Now() - ignore = make(map[string][]string) - hashFiles := w.walkAndHashFiles(&files, ignore) + files = make(chan protocol.FileInfo) + hashFiles := w.walkAndHashFiles(files, ignore) - filepath.Walk(w.Dir, w.loadIgnoreFiles(w.Dir, ignore)) - filepath.Walk(w.Dir, hashFiles) + go func() { + filepath.Walk(w.Dir, w.loadIgnoreFiles(w.Dir, ignore)) + filepath.Walk(w.Dir, hashFiles) + close(files) + }() - if debug { - t1 := time.Now() - d := t1.Sub(t0).Seconds() - l.Debugf("Walk in %.02f ms, %.0f files/s", d*1000, float64(len(files))/d) - } - - err = checkDir(w.Dir) return } @@ -122,7 +117,7 @@ func (w *Walker) loadIgnoreFiles(dir string, ign map[string][]string) filepath.W } } -func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]string) filepath.WalkFunc { +func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo, ign map[string][]string) filepath.WalkFunc { return func(p string, info os.FileInfo, err error) error { if err != nil { if debug { @@ -175,7 +170,6 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str if debug { l.Debugln("unchanged:", cf) } - *res = append(*res, cf) return nil } } @@ -195,7 +189,7 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str if debug { l.Debugln("dir:", f) } - *res = append(*res, f) + fchan <- f return nil } @@ -207,7 +201,6 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str if debug { l.Debugln("unchanged:", cf) } - *res = append(*res, cf) return nil } @@ -220,7 +213,7 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str if debug { l.Debugln("suppressed:", cf) } - *res = append(*res, cf) + fchan <- cf return nil } else if prev && !cur { l.Infof("Changes to %q are no longer suppressed.", p) @@ -265,7 +258,7 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str Modified: info.ModTime().Unix(), Blocks: blocks, } - *res = append(*res, f) + fchan <- f } return nil