From 6d3f9d5154b4c6b39f8083255cc9ed36b5c51146 Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Sat, 10 Feb 2018 16:56:53 +0100 Subject: [PATCH] all: Simultaneously walk fs and db on scan (fixes #2571, fixes #4573) (#4584) When scanner.Walk detects a change, it now returns the new file info as well as the old file info. It also finds deleted and ignored files while scanning. Also directory deletions are now always committed to db after their children to prevent temporary failure on remote due to non-empty directory. --- lib/db/set.go | 6 +- lib/fs/walkfs.go | 6 +- lib/model/model.go | 152 +++++-------- lib/model/model_test.go | 179 +++++++++++++++ lib/protocol/bep_extensions.go | 21 ++ lib/protocol/protocol_test.go | 50 +++- lib/scanner/blockqueue.go | 18 +- lib/scanner/walk.go | 404 +++++++++++++++++++++++---------- lib/scanner/walk_test.go | 111 +++++++-- 9 files changed, 685 insertions(+), 262 deletions(-) diff --git a/lib/db/set.go b/lib/db/set.go index afaa36494..71c1a19fd 100644 --- a/lib/db/set.go +++ b/lib/db/set.go @@ -194,9 +194,9 @@ func (s *FileSet) WithHaveTruncated(device protocol.DeviceID, fn Iterator) { s.db.withHave([]byte(s.folder), device[:], nil, true, nativeFileIterator(fn)) } -func (s *FileSet) WithPrefixedHaveTruncated(device protocol.DeviceID, prefix string, fn Iterator) { - l.Debugf("%s WithPrefixedHaveTruncated(%v)", s.folder, device) - s.db.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn)) +func (s *FileSet) WithPrefixedHave(device protocol.DeviceID, prefix string, fn Iterator) { + l.Debugf("%s WithPrefixedHave(%v)", s.folder, device) + s.db.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), false, nativeFileIterator(fn)) } func (s *FileSet) WithGlobal(fn Iterator) { l.Debugf("%s WithGlobal()", s.folder) diff --git a/lib/fs/walkfs.go b/lib/fs/walkfs.go index 005c7e262..d4fa391f6 100644 --- a/lib/fs/walkfs.go +++ b/lib/fs/walkfs.go @@ -10,7 +10,10 @@ package fs -import "path/filepath" +import ( + "path/filepath" + "sort" +) // WalkFunc is the type of the function called for each file or directory // visited by Walk. The path argument contains the argument to Walk as a @@ -54,6 +57,7 @@ func (f *walkFilesystem) walk(path string, info FileInfo, walkFn WalkFunc) error if err != nil { return walkFn(path, info, err) } + sort.Strings(names) for _, name := range names { filename := filepath.Join(path, name) diff --git a/lib/model/model.go b/lib/model/model.go index 304f33e43..b14809ea8 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1395,14 +1395,21 @@ func (m *Model) CurrentGlobalFile(folder string, file string) (protocol.FileInfo return fs.GetGlobal(file) } -type cFiler struct { - m *Model - r string +type haveWalker struct { + fset *db.FileSet } -// Implements scanner.CurrentFiler -func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) { - return cf.m.CurrentFolderFile(cf.r, file) +func (h haveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) { + ctxChan := ctx.Done() + h.fset.WithPrefixedHave(protocol.LocalDeviceID, prefix, func(fi db.FileIntf) bool { + f := fi.(protocol.FileInfo) + select { + case out <- f: + case <-ctxChan: + return false + } + return true + }) } // Connection returns the current connection for device, and a boolean wether a connection was found. @@ -1955,13 +1962,14 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su runner.setState(FolderScanning) - fchan := scanner.Walk(ctx, scanner.Config{ + haveWalker := haveWalker{fset} + rchan := scanner.Walk(ctx, scanner.Config{ Folder: folderCfg.ID, Subs: subDirs, Matcher: ignores, BlockSize: protocol.BlockSize, TempLifetime: time.Duration(m.cfg.Options().KeepTemporariesH) * time.Hour, - CurrentFiler: cFiler{m, folder}, + Have: haveWalker, Filesystem: mtimefs, IgnorePerms: folderCfg.IgnorePerms, AutoNormalize: folderCfg.AutoNormalize, @@ -1978,6 +1986,17 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles) batchSizeBytes := 0 changes := 0 + checkBatch := func() error { + if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { + if err := runner.CheckHealth(); err != nil { + return err + } + m.updateLocalsFromScanning(folder, batch) + batch = batch[:0] + batchSizeBytes = 0 + } + return nil + } // Schedule a pull after scanning, but only if we actually detected any // changes. @@ -1987,98 +2006,49 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su } }() - for f := range fchan { - if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { - if err := runner.CheckHealth(); err != nil { + var delDirStack []protocol.FileInfo + for r := range rchan { + if err := checkBatch(); err != nil { + l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) + return err + } + + // Append deleted dirs from stack if the current file isn't a child, + // which means all children were already processed. + for len(delDirStack) != 0 && !strings.HasPrefix(r.New.Name, delDirStack[len(delDirStack)-1].Name+string(fs.PathSeparator)) { + lastDelDir := delDirStack[len(delDirStack)-1] + batch = append(batch, lastDelDir) + batchSizeBytes += lastDelDir.ProtoSize() + changes++ + if err := checkBatch(); err != nil { l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) return err } - m.updateLocalsFromScanning(folder, batch) - batch = batch[:0] - batchSizeBytes = 0 + delDirStack = delDirStack[:len(delDirStack)-1] } - batch = append(batch, f) - batchSizeBytes += f.ProtoSize() + // Delay appending deleted dirs until all its children are processed + if r.Old.IsDirectory() && (r.New.Deleted || !r.New.IsDirectory()) { + delDirStack = append(delDirStack, r.New) + continue + } + + l.Debugln("Appending", r) + batch = append(batch, r.New) + batchSizeBytes += r.New.ProtoSize() changes++ } - if err := runner.CheckHealth(); err != nil { - l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) - return err - } else if len(batch) > 0 { - m.updateLocalsFromScanning(folder, batch) - } - - if len(subDirs) == 0 { - // If we have no specific subdirectories to traverse, set it to one - // empty prefix so we traverse the entire folder contents once. - subDirs = []string{""} - } - - // Do a scan of the database for each prefix, to check for deleted and - // ignored files. - batch = batch[:0] - batchSizeBytes = 0 - for _, sub := range subDirs { - var iterError error - - fset.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool { - f := fi.(db.FileInfoTruncated) - if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { - if err := runner.CheckHealth(); err != nil { - iterError = err - return false - } - m.updateLocalsFromScanning(folder, batch) - batch = batch[:0] - batchSizeBytes = 0 - } - - switch { - case !f.IsInvalid() && ignores.Match(f.Name).IsIgnored(): - // File was valid at last pass but has been ignored. Set invalid bit. - l.Debugln("setting invalid bit on ignored", f) - nf := f.ConvertToInvalidFileInfo(m.id.Short()) - batch = append(batch, nf) - batchSizeBytes += nf.ProtoSize() - changes++ - - case !f.IsInvalid() && !f.IsDeleted(): - // The file is valid and not deleted. Lets check if it's - // still here. - - if _, err := mtimefs.Lstat(f.Name); err != nil { - // We don't specifically verify that the error is - // fs.IsNotExist because there is a corner case when a - // directory is suddenly transformed into a file. When that - // happens, files that were in the directory (that is now a - // file) are deleted but will return a confusing error ("not a - // directory") when we try to Lstat() them. - - nf := protocol.FileInfo{ - Name: f.Name, - Type: f.Type, - Size: 0, - ModifiedS: f.ModifiedS, - ModifiedNs: f.ModifiedNs, - ModifiedBy: m.id.Short(), - Deleted: true, - Version: f.Version.Update(m.shortID), - } - - batch = append(batch, nf) - batchSizeBytes += nf.ProtoSize() - changes++ - } - } - return true - }) - - if iterError != nil { - l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), iterError) - return iterError + // Append remaining deleted dirs. + for i := len(delDirStack) - 1; i >= 0; i-- { + if err := checkBatch(); err != nil { + l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) + return err } + + batch = append(batch, delDirStack[i]) + batchSizeBytes += delDirStack[i].ProtoSize() + changes++ } if err := runner.CheckHealth(); err != nil { diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 309210c97..be3ac0c50 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -2808,6 +2808,185 @@ func TestNoRequestsFromPausedDevices(t *testing.T) { } } +// TestIssue2571 tests replacing a directory with content with a symlink +func TestIssue2571(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Symlinks aren't supported by fs and scanner on windows") + } + err := defaultFs.MkdirAll("replaceDir", 0755) + if err != nil { + t.Fatal(err) + } + defer func() { + defaultFs.RemoveAll("replaceDir") + }() + + testFs := fs.NewFilesystem(fs.FilesystemTypeBasic, filepath.Join(defaultFs.URI(), "replaceDir")) + + for _, dir := range []string{"toLink", "linkTarget"} { + err := testFs.MkdirAll(dir, 0775) + if err != nil { + t.Fatal(err) + } + fd, err := testFs.Create(filepath.Join(dir, "a")) + if err != nil { + t.Fatal(err) + } + fd.Close() + } + + dbi := db.OpenMemory() + m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil) + m.AddFolder(defaultFolderConfig) + m.StartFolder("default") + m.ServeBackground() + defer m.Stop() + m.ScanFolder("default") + + if err = testFs.RemoveAll("toLink"); err != nil { + t.Fatal(err) + } + + if err := osutil.DebugSymlinkForTestsOnly(filepath.Join(testFs.URI(), "linkTarget"), filepath.Join(testFs.URI(), "toLink")); err != nil { + t.Fatal(err) + } + + m.ScanFolder("default") + + if dir, ok := m.CurrentFolderFile("default", filepath.Join("replaceDir", "toLink")); !ok { + t.Fatalf("Dir missing in db") + } else if !dir.IsSymlink() { + t.Errorf("Dir wasn't changed to symlink") + } + if file, ok := m.CurrentFolderFile("default", filepath.Join("replaceDir", "toLink", "a")); !ok { + t.Fatalf("File missing in db") + } else if !file.Deleted { + t.Errorf("File below symlink has not been marked as deleted") + } +} + +// TestIssue4573 tests that contents of an unavailable dir aren't marked deleted +func TestIssue4573(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Can't make the dir inaccessible on windows") + } + + err := defaultFs.MkdirAll("inaccessible", 0755) + if err != nil { + t.Fatal(err) + } + defer func() { + defaultFs.Chmod("inaccessible", 0777) + defaultFs.RemoveAll("inaccessible") + }() + + file := filepath.Join("inaccessible", "a") + fd, err := defaultFs.Create(file) + if err != nil { + t.Fatal(err) + } + fd.Close() + + dbi := db.OpenMemory() + m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil) + m.AddFolder(defaultFolderConfig) + m.StartFolder("default") + m.ServeBackground() + defer m.Stop() + m.ScanFolder("default") + + err = defaultFs.Chmod("inaccessible", 0000) + if err != nil { + t.Fatal(err) + } + + m.ScanFolder("default") + + if file, ok := m.CurrentFolderFile("default", file); !ok { + t.Fatalf("File missing in db") + } else if file.Deleted { + t.Errorf("Inaccessible file has been marked as deleted.") + } +} + +// TestInternalScan checks whether various fs operations are correctly represented +// in the db after scanning. +func TestInternalScan(t *testing.T) { + err := defaultFs.MkdirAll("internalScan", 0755) + if err != nil { + t.Fatal(err) + } + defer func() { + defaultFs.RemoveAll("internalScan") + }() + + testFs := fs.NewFilesystem(fs.FilesystemTypeBasic, filepath.Join(defaultFs.URI(), "internalScan")) + + testCases := map[string]func(protocol.FileInfo) bool{ + "removeDir": func(f protocol.FileInfo) bool { + return !f.Deleted + }, + "dirToFile": func(f protocol.FileInfo) bool { + return f.Deleted || f.IsDirectory() + }, + } + + baseDirs := []string{"dirToFile", "removeDir"} + for _, dir := range baseDirs { + sub := filepath.Join(dir, "subDir") + for _, dir := range []string{dir, sub} { + err := testFs.MkdirAll(dir, 0775) + if err != nil { + t.Fatalf("%v: %v", dir, err) + } + } + testCases[sub] = func(f protocol.FileInfo) bool { + return !f.Deleted + } + for _, dir := range []string{dir, sub} { + file := filepath.Join(dir, "a") + fd, err := testFs.Create(file) + if err != nil { + t.Fatal(err) + } + fd.Close() + testCases[file] = func(f protocol.FileInfo) bool { + return !f.Deleted + } + } + } + + dbi := db.OpenMemory() + m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil) + m.AddFolder(defaultFolderConfig) + m.StartFolder("default") + m.ServeBackground() + defer m.Stop() + m.ScanFolder("default") + + for _, dir := range baseDirs { + if err = testFs.RemoveAll(dir); err != nil { + t.Fatal(err) + } + } + + fd, err := testFs.Create("dirToFile") + if err != nil { + t.Fatal(err) + } + fd.Close() + + m.ScanFolder("default") + + for path, cond := range testCases { + if f, ok := m.CurrentFolderFile("default", filepath.Join("internalScan", path)); !ok { + t.Fatalf("%v missing in db", path) + } else if cond(f) { + t.Errorf("Incorrect db entry for %v", path) + } + } +} + func TestCustomMarkerName(t *testing.T) { ldb := db.OpenMemory() set := db.NewFileSet("default", defaultFs, ldb) diff --git a/lib/protocol/bep_extensions.go b/lib/protocol/bep_extensions.go index 6c339f124..2bd2e0e0d 100644 --- a/lib/protocol/bep_extensions.go +++ b/lib/protocol/bep_extensions.go @@ -117,6 +117,10 @@ func (f FileInfo) WinsConflict(other FileInfo) bool { return f.Version.Compare(other.Version) == ConcurrentGreater } +func (f FileInfo) IsEmpty() bool { + return f.Version.Counters == nil +} + func (f *FileInfo) Invalidate(invalidatedBy ShortID) { f.Invalid = true f.ModifiedBy = invalidatedBy @@ -124,6 +128,23 @@ func (f *FileInfo) Invalidate(invalidatedBy ShortID) { f.Sequence = 0 } +func (f *FileInfo) InvalidatedCopy(invalidatedBy ShortID) FileInfo { + copy := *f + copy.Invalidate(invalidatedBy) + return copy +} + +func (f *FileInfo) DeletedCopy(deletedBy ShortID) FileInfo { + copy := *f + copy.Size = 0 + copy.ModifiedBy = deletedBy + copy.Deleted = true + copy.Version = f.Version.Update(deletedBy) + copy.Sequence = 0 + copy.Blocks = nil + return copy +} + func (b BlockInfo) String() string { return fmt.Sprintf("Block{%d/%d/%d/%x}", b.Offset, b.Size, b.WeakHash, b.Hash) } diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go index f19710ba9..6f2249eab 100644 --- a/lib/protocol/protocol_test.go +++ b/lib/protocol/protocol_test.go @@ -24,6 +24,10 @@ var ( quickCfg = &quick.Config{} ) +const ( + fileSize = 1 << 40 +) + func TestPing(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() @@ -243,12 +247,6 @@ func TestMarshalledIndexMessageSize(t *testing.T) { return } - const ( - maxMessageSize = MaxMessageLen - fileSize = 1 << 40 - blockSize = BlockSize - ) - f := FileInfo{ Name: "a normal length file name withoout any weird stuff.txt", Type: FileInfoTypeFile, @@ -256,12 +254,12 @@ func TestMarshalledIndexMessageSize(t *testing.T) { Permissions: 0666, ModifiedS: time.Now().Unix(), Version: Vector{Counters: []Counter{{ID: 1 << 60, Value: 1}, {ID: 2 << 60, Value: 1}}}, - Blocks: make([]BlockInfo, fileSize/blockSize), + Blocks: make([]BlockInfo, fileSize/BlockSize), } - for i := 0; i < fileSize/blockSize; i++ { - f.Blocks[i].Offset = int64(i) * blockSize - f.Blocks[i].Size = blockSize + for i := 0; i < fileSize/BlockSize; i++ { + f.Blocks[i].Offset = int64(i) * BlockSize + f.Blocks[i].Size = BlockSize f.Blocks[i].Hash = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 1, 2, 3, 4, 5, 6, 7, 8, 9, 30, 1, 2} } @@ -271,8 +269,8 @@ func TestMarshalledIndexMessageSize(t *testing.T) { } msgSize := idx.ProtoSize() - if msgSize > maxMessageSize { - t.Errorf("Message size %d bytes is larger than max %d", msgSize, maxMessageSize) + if msgSize > MaxMessageLen { + t.Errorf("Message size %d bytes is larger than max %d", msgSize, MaxMessageLen) } } @@ -400,3 +398,31 @@ func TestCheckConsistency(t *testing.T) { } } } + +func TestCopyFileInfo(t *testing.T) { + f := FileInfo{ + Name: "a normal length file name withoout any weird stuff.txt", + Type: FileInfoTypeFile, + Size: fileSize, + Permissions: 0666, + ModifiedS: time.Now().Unix(), + Version: Vector{Counters: []Counter{{ID: 1 << 60, Value: 1}, {ID: 2 << 60, Value: 1}}}, + Blocks: make([]BlockInfo, fileSize/BlockSize), + } + + del := f.DeletedCopy(LocalDeviceID.Short()) + if f.Deleted { + t.Errorf("Source file info was deleted on copy") + } + if !del.Deleted { + t.Errorf("Returned file info was not deleted on copy") + } + + inv := f.InvalidatedCopy(LocalDeviceID.Short()) + if f.Invalid { + t.Errorf("Source file info was invalid on copy") + } + if !inv.Invalid { + t.Errorf("Returned file info was not invalid on copy") + } +} diff --git a/lib/scanner/blockqueue.go b/lib/scanner/blockqueue.go index e1ef4be1f..e354423eb 100644 --- a/lib/scanner/blockqueue.go +++ b/lib/scanner/blockqueue.go @@ -65,15 +65,15 @@ type parallelHasher struct { fs fs.Filesystem blockSize int workers int - outbox chan<- protocol.FileInfo - inbox <-chan protocol.FileInfo + outbox chan<- ScanResult + inbox <-chan ScanResult counter Counter done chan<- struct{} useWeakHashes bool wg sync.WaitGroup } -func newParallelHasher(ctx context.Context, fs fs.Filesystem, blockSize, workers int, outbox chan<- protocol.FileInfo, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}, useWeakHashes bool) { +func newParallelHasher(ctx context.Context, fs fs.Filesystem, blockSize, workers int, outbox chan<- ScanResult, inbox <-chan ScanResult, counter Counter, done chan<- struct{}, useWeakHashes bool) { ph := ¶llelHasher{ fs: fs, blockSize: blockSize, @@ -104,25 +104,25 @@ func (ph *parallelHasher) hashFiles(ctx context.Context) { return } - if f.IsDirectory() || f.IsDeleted() { + if f.New.IsDirectory() || f.New.IsDeleted() { panic("Bug. Asked to hash a directory or a deleted file.") } - blocks, err := HashFile(ctx, ph.fs, f.Name, ph.blockSize, ph.counter, ph.useWeakHashes) + blocks, err := HashFile(ctx, ph.fs, f.New.Name, ph.blockSize, ph.counter, ph.useWeakHashes) if err != nil { - l.Debugln("hash error:", f.Name, err) + l.Debugln("hash error:", f.New.Name, err) continue } - f.Blocks = blocks + f.New.Blocks = blocks // The size we saw when initially deciding to hash the file // might not have been the size it actually had when we hashed // it. Update the size from the block list. - f.Size = 0 + f.New.Size = 0 for _, b := range blocks { - f.Size += int64(b.Size) + f.New.Size += int64(b.Size) } select { diff --git a/lib/scanner/walk.go b/lib/scanner/walk.go index 06d138dfe..5112f1e87 100644 --- a/lib/scanner/walk.go +++ b/lib/scanner/walk.go @@ -8,7 +8,9 @@ package scanner import ( "context" + "errors" "runtime" + "strings" "sync/atomic" "time" "unicode/utf8" @@ -48,8 +50,8 @@ type Config struct { Matcher *ignore.Matcher // Number of hours to keep temporary files for TempLifetime time.Duration - // If CurrentFiler is not nil, it is queried for the current file before rescanning. - CurrentFiler CurrentFiler + // Walks over file infos as present in the db before the scan alphabetically. + Have haveWalker // The Filesystem provides an abstraction on top of the actual filesystem. Filesystem fs.Filesystem // If IgnorePerms is true, changes to permission bits will not be @@ -70,16 +72,28 @@ type Config struct { UseWeakHashes bool } -type CurrentFiler interface { - // CurrentFile returns the file as seen at last scan. - CurrentFile(name string) (protocol.FileInfo, bool) +type haveWalker interface { + // Walk passes all local file infos from the db which start with prefix + // to out and aborts early if ctx is cancelled. + Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) } -func Walk(ctx context.Context, cfg Config) chan protocol.FileInfo { +type fsWalkResult struct { + path string + info fs.FileInfo + err error +} + +type ScanResult struct { + New protocol.FileInfo + Old protocol.FileInfo +} + +func Walk(ctx context.Context, cfg Config) chan ScanResult { w := walker{cfg} - if w.CurrentFiler == nil { - w.CurrentFiler = noCurrentFiler{} + if w.Have == nil { + w.Have = noHaveWalker{} } if w.Filesystem == nil { panic("no filesystem specified") @@ -97,25 +111,19 @@ type walker struct { // Walk returns the list of files found in the local folder by scanning the // file system. Files are blockwise hashed. -func (w *walker) walk(ctx context.Context) chan protocol.FileInfo { +func (w *walker) walk(ctx context.Context) chan ScanResult { l.Debugln("Walk", w.Subs, w.BlockSize, w.Matcher) - toHashChan := make(chan protocol.FileInfo) - finishedChan := make(chan protocol.FileInfo) + haveChan := make(chan protocol.FileInfo) + haveCtx, haveCancel := context.WithCancel(ctx) + go w.dbWalkerRoutine(haveCtx, haveChan) - // A routine which walks the filesystem tree, and sends files which have - // been modified to the counter routine. - go func() { - hashFiles := w.walkAndHashFiles(ctx, toHashChan, finishedChan) - if len(w.Subs) == 0 { - w.Filesystem.Walk(".", hashFiles) - } else { - for _, sub := range w.Subs { - w.Filesystem.Walk(sub, hashFiles) - } - } - close(toHashChan) - }() + fsChan := make(chan fsWalkResult) + go w.fsWalkerRoutine(ctx, fsChan, haveCancel) + + toHashChan := make(chan ScanResult) + finishedChan := make(chan ScanResult) + go w.processWalkResults(ctx, fsChan, haveChan, toHashChan, finishedChan) // We're not required to emit scan progress events, just kick off hashers, // and feed inputs directly from the walker. @@ -139,15 +147,15 @@ func (w *walker) walk(ctx context.Context) chan protocol.FileInfo { // Parallel hasher is stopped by this routine when we close the channel over // which it receives the files we ask it to hash. go func() { - var filesToHash []protocol.FileInfo + var filesToHash []ScanResult var total int64 = 1 for file := range toHashChan { filesToHash = append(filesToHash, file) - total += file.Size + total += file.New.Size } - realToHashChan := make(chan protocol.FileInfo) + realToHashChan := make(chan ScanResult) done := make(chan struct{}) progress := newByteCounter() @@ -183,7 +191,7 @@ func (w *walker) walk(ctx context.Context) chan protocol.FileInfo { loop: for _, file := range filesToHash { - l.Debugln("real to hash:", file.Name) + l.Debugln("real to hash:", file.New.Name) select { case realToHashChan <- file: case <-ctx.Done(): @@ -196,15 +204,49 @@ func (w *walker) walk(ctx context.Context) chan protocol.FileInfo { return finishedChan } -func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protocol.FileInfo) fs.WalkFunc { - now := time.Now() - return func(path string, info fs.FileInfo, err error) error { +// dbWalkerRoutine walks the db and sends back file infos to be compared to scan results. +func (w *walker) dbWalkerRoutine(ctx context.Context, haveChan chan<- protocol.FileInfo) { + defer close(haveChan) + + if len(w.Subs) == 0 { + w.Have.Walk("", ctx, haveChan) + return + } + + for _, sub := range w.Subs { select { case <-ctx.Done(): - return ctx.Err() + return default: } + w.Have.Walk(sub, ctx, haveChan) + } +} +// fsWalkerRoutine walks the filesystem tree and sends back file infos and potential +// errors at paths that need to be processed. +func (w *walker) fsWalkerRoutine(ctx context.Context, fsChan chan<- fsWalkResult, haveCancel context.CancelFunc) { + defer close(fsChan) + + walkFn := w.createFSWalkFn(ctx, fsChan) + if len(w.Subs) == 0 { + if err := w.Filesystem.Walk(".", walkFn); err != nil { + haveCancel() + } + return + } + + for _, sub := range w.Subs { + if err := w.Filesystem.Walk(sub, walkFn); err != nil { + haveCancel() + break + } + } +} + +func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult) fs.WalkFunc { + now := time.Now() + return func(path string, info fs.FileInfo, err error) error { // Return value used when we are returning early and don't want to // process the item. For directories, this means do-not-descend. var skip error // nil @@ -213,21 +255,14 @@ func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protoco skip = fs.SkipDir } - if err != nil { - l.Debugln("error:", path, info, err) - return skip - } - if path == "." { + if err != nil { + fsWalkError(ctx, fsChan, path, err) + return skip + } return nil } - info, err = w.Filesystem.Lstat(path) - // An error here would be weird as we've already gotten to this point, but act on it nonetheless - if err != nil { - return skip - } - if fs.IsTemporary(path) { l.Debugln("temporary:", path) if info.IsRegular() && info.ModTime().Add(w.TempLifetime).Before(now) { @@ -238,48 +273,177 @@ func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protoco } if fs.IsInternal(path) { - l.Debugln("ignored (internal):", path) + l.Debugln("skip walking (internal):", path) return skip } if w.Matcher.Match(path).IsIgnored() { - l.Debugln("ignored (patterns):", path) + l.Debugln("skip walking (patterns):", path) + return skip + } + + if err != nil { + if sendErr := fsWalkError(ctx, fsChan, path, err); sendErr != nil { + return sendErr + } return skip } if !utf8.ValidString(path) { + if err := fsWalkError(ctx, fsChan, path, errors.New("path isn't a valid utf8 string")); err != nil { + return err + } l.Warnf("File name %q is not in UTF8 encoding; skipping.", path) return skip } path, shouldSkip := w.normalizePath(path, info) if shouldSkip { + if err := fsWalkError(ctx, fsChan, path, errors.New("failed to normalize path")); err != nil { + return err + } return skip } - switch { - case info.IsSymlink(): - if err := w.walkSymlink(ctx, path, dchan); err != nil { - return err - } - if info.IsDir() { - // under no circumstances shall we descend into a symlink - return fs.SkipDir - } - return nil + select { + case fsChan <- fsWalkResult{ + path: path, + info: info, + err: nil, + }: + case <-ctx.Done(): + return ctx.Err() + } - case info.IsDir(): - err = w.walkDir(ctx, path, info, dchan) - - case info.IsRegular(): - err = w.walkRegular(ctx, path, info, fchan) + // under no circumstances shall we descend into a symlink + if info.IsSymlink() && info.IsDir() { + l.Debugln("skip walking (symlinked directory):", path) + return skip } return err } } -func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, fchan chan protocol.FileInfo) error { +func fsWalkError(ctx context.Context, dst chan<- fsWalkResult, path string, err error) error { + select { + case dst <- fsWalkResult{ + path: path, + info: nil, + err: err, + }: + case <-ctx.Done(): + return ctx.Err() + } + + return nil +} + +func (w *walker) processWalkResults(ctx context.Context, fsChan <-chan fsWalkResult, haveChan <-chan protocol.FileInfo, toHashChan, finishedChan chan<- ScanResult) { + ctxChan := ctx.Done() + fsRes, fsChanOpen := <-fsChan + currDBFile, haveChanOpen := <-haveChan + for fsChanOpen { + if haveChanOpen { + // File infos below an error walking the filesystem tree + // may be marked as ignored but should not be deleted. + if fsRes.err != nil && (strings.HasPrefix(currDBFile.Name, fsRes.path+string(fs.PathSeparator)) || fsRes.path == ".") { + w.checkIgnoredAndInvalidate(currDBFile, finishedChan, ctxChan) + currDBFile, haveChanOpen = <-haveChan + continue + } + // Delete file infos that were not encountered when + // walking the filesystem tree, except on error (see + // above) or if they are ignored. + if currDBFile.Name < fsRes.path { + w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan) + currDBFile, haveChanOpen = <-haveChan + continue + } + } + + var oldFile protocol.FileInfo + if haveChanOpen && currDBFile.Name == fsRes.path { + oldFile = currDBFile + currDBFile, haveChanOpen = <-haveChan + } + + if fsRes.err != nil { + if fs.IsNotExist(fsRes.err) && !oldFile.IsEmpty() && !oldFile.Deleted { + select { + case finishedChan <- ScanResult{ + New: oldFile.DeletedCopy(w.ShortID), + Old: oldFile, + }: + case <-ctx.Done(): + return + } + } + fsRes, fsChanOpen = <-fsChan + continue + } + + switch { + case fsRes.info.IsDir(): + w.walkDir(ctx, fsRes.path, fsRes.info, oldFile, finishedChan) + + case fsRes.info.IsSymlink(): + w.walkSymlink(ctx, fsRes.path, oldFile, finishedChan) + + case fsRes.info.IsRegular(): + w.walkRegular(ctx, fsRes.path, fsRes.info, oldFile, toHashChan) + } + + fsRes, fsChanOpen = <-fsChan + } + + // Filesystem tree walking finished, if there is anything left in the + // db, mark it as deleted, except when it's ignored. + if haveChanOpen { + w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan) + for currDBFile = range haveChan { + w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan) + } + } + + close(toHashChan) +} + +func (w *walker) checkIgnoredAndDelete(f protocol.FileInfo, finishedChan chan<- ScanResult, done <-chan struct{}) { + if w.checkIgnoredAndInvalidate(f, finishedChan, done) { + return + } + + if !f.Invalid && !f.Deleted { + select { + case finishedChan <- ScanResult{ + New: f.DeletedCopy(w.ShortID), + Old: f, + }: + case <-done: + } + } +} + +func (w *walker) checkIgnoredAndInvalidate(f protocol.FileInfo, finishedChan chan<- ScanResult, done <-chan struct{}) bool { + if !w.Matcher.Match(f.Name).IsIgnored() { + return false + } + + if !f.Invalid { + select { + case finishedChan <- ScanResult{ + New: f.InvalidatedCopy(w.ShortID), + Old: f, + }: + case <-done: + } + } + + return true +} + +func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, cf protocol.FileInfo, toHashChan chan<- ScanResult) { curMode := uint32(info.Mode()) if runtime.GOOS == "windows" && osutil.IsWindowsExecutable(relPath) { curMode |= 0111 @@ -294,40 +458,38 @@ func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileIn // - was not a symlink (since it's a file now) // - was not invalid (since it looks valid now) // - has the same size as previously - cf, ok := w.CurrentFiler.CurrentFile(relPath) - permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode) - if ok && permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() && - !cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() { - return nil - } - - if ok { + if !cf.IsEmpty() { + permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode) + if permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() && + !cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() { + return + } l.Debugln("rescan:", cf, info.ModTime().Unix(), info.Mode()&fs.ModePerm) } - f := protocol.FileInfo{ - Name: relPath, - Type: protocol.FileInfoTypeFile, - Version: cf.Version.Update(w.ShortID), - Permissions: curMode & uint32(maskModePerm), - NoPermissions: w.IgnorePerms, - ModifiedS: info.ModTime().Unix(), - ModifiedNs: int32(info.ModTime().Nanosecond()), - ModifiedBy: w.ShortID, - Size: info.Size(), + f := ScanResult{ + New: protocol.FileInfo{ + Name: relPath, + Type: protocol.FileInfoTypeFile, + Version: cf.Version.Update(w.ShortID), + Permissions: curMode & uint32(maskModePerm), + NoPermissions: w.IgnorePerms, + ModifiedS: info.ModTime().Unix(), + ModifiedNs: int32(info.ModTime().Nanosecond()), + ModifiedBy: w.ShortID, + Size: info.Size(), + }, + Old: cf, } l.Debugln("to hash:", relPath, f) select { - case fchan <- f: + case toHashChan <- f: case <-ctx.Done(): - return ctx.Err() } - - return nil } -func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error { +func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, cf protocol.FileInfo, finishedChan chan<- ScanResult) { // A directory is "unchanged", if it // - exists // - has the same permissions as previously, unless we are ignoring permissions @@ -335,40 +497,41 @@ func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, // - was a directory previously (not a file or something else) // - was not a symlink (since it's a directory now) // - was not invalid (since it looks valid now) - cf, ok := w.CurrentFiler.CurrentFile(relPath) - permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode())) - if ok && permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() { - return nil + if !cf.IsEmpty() { + permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode())) + if permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() { + return + } } - f := protocol.FileInfo{ - Name: relPath, - Type: protocol.FileInfoTypeDirectory, - Version: cf.Version.Update(w.ShortID), - Permissions: uint32(info.Mode() & maskModePerm), - NoPermissions: w.IgnorePerms, - ModifiedS: info.ModTime().Unix(), - ModifiedNs: int32(info.ModTime().Nanosecond()), - ModifiedBy: w.ShortID, + f := ScanResult{ + New: protocol.FileInfo{ + Name: relPath, + Type: protocol.FileInfoTypeDirectory, + Version: cf.Version.Update(w.ShortID), + Permissions: uint32(info.Mode() & maskModePerm), + NoPermissions: w.IgnorePerms, + ModifiedS: info.ModTime().Unix(), + ModifiedNs: int32(info.ModTime().Nanosecond()), + ModifiedBy: w.ShortID, + }, + Old: cf, } l.Debugln("dir:", relPath, f) select { - case dchan <- f: + case finishedChan <- f: case <-ctx.Done(): - return ctx.Err() } - - return nil } // walkSymlink returns nil or an error, if the error is of the nature that // it should stop the entire walk. -func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan protocol.FileInfo) error { +func (w *walker) walkSymlink(ctx context.Context, relPath string, cf protocol.FileInfo, finishedChan chan<- ScanResult) { // Symlinks are not supported on Windows. We ignore instead of returning // an error. if runtime.GOOS == "windows" { - return nil + return } // We always rehash symlinks as they have no modtime or @@ -379,7 +542,7 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan pro target, err := w.Filesystem.ReadSymlink(relPath) if err != nil { l.Debugln("readlink error:", relPath, err) - return nil + return } // A symlink is "unchanged", if @@ -388,28 +551,27 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan pro // - it was a symlink // - it wasn't invalid // - the target was the same - cf, ok := w.CurrentFiler.CurrentFile(relPath) - if ok && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target { - return nil + if !cf.IsEmpty() && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target { + return } - f := protocol.FileInfo{ - Name: relPath, - Type: protocol.FileInfoTypeSymlink, - Version: cf.Version.Update(w.ShortID), - NoPermissions: true, // Symlinks don't have permissions of their own - SymlinkTarget: target, + f := ScanResult{ + New: protocol.FileInfo{ + Name: relPath, + Type: protocol.FileInfoTypeSymlink, + Version: cf.Version.Update(w.ShortID), + NoPermissions: true, // Symlinks don't have permissions of their own + SymlinkTarget: target, + }, + Old: cf, } l.Debugln("symlink changedb:", relPath, f) select { - case dchan <- f: + case finishedChan <- f: case <-ctx.Done(): - return ctx.Err() } - - return nil } // normalizePath returns the normalized relative path (possibly after fixing @@ -532,10 +694,6 @@ func (c *byteCounter) Close() { close(c.stop) } -// A no-op CurrentFiler +type noHaveWalker struct{} -type noCurrentFiler struct{} - -func (noCurrentFiler) CurrentFile(name string) (protocol.FileInfo, bool) { - return protocol.FileInfo{}, false -} +func (noHaveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) {} diff --git a/lib/scanner/walk_test.go b/lib/scanner/walk_test.go index e9121a6b4..a3100e1a1 100644 --- a/lib/scanner/walk_test.go +++ b/lib/scanner/walk_test.go @@ -69,7 +69,7 @@ func TestWalkSub(t *testing.T) { Matcher: ignores, Hashers: 2, }) - var files []protocol.FileInfo + var files []ScanResult for f := range fchan { files = append(files, f) } @@ -80,10 +80,10 @@ func TestWalkSub(t *testing.T) { if len(files) != 2 { t.Fatalf("Incorrect length %d != 2", len(files)) } - if files[0].Name != "dir2" { + if files[0].New.Name != "dir2" { t.Errorf("Incorrect file %v != dir2", files[0]) } - if files[1].Name != filepath.Join("dir2", "cfile") { + if files[1].New.Name != filepath.Join("dir2", "cfile") { t.Errorf("Incorrect file %v != dir2/cfile", files[1]) } } @@ -103,7 +103,7 @@ func TestWalk(t *testing.T) { Hashers: 2, }) - var tmp []protocol.FileInfo + var tmp []ScanResult for f := range fchan { tmp = append(tmp, f) } @@ -251,9 +251,9 @@ func TestNormalization(t *testing.T) { } func TestIssue1507(t *testing.T) { - w := &walker{} - c := make(chan protocol.FileInfo, 100) - fn := w.walkAndHashFiles(context.TODO(), c, c) + w := &walker{Config{Matcher: ignore.New(fs.NewFilesystem(fs.FilesystemTypeBasic, "."))}} + c := make(chan fsWalkResult, 100) + fn := w.createFSWalkFn(context.TODO(), c) fn("", nil, protocol.ErrClosed) } @@ -274,15 +274,14 @@ func TestWalkSymlinkUnix(t *testing.T) { // Scan it files, _ := walkDir(fs.NewFilesystem(fs.FilesystemTypeBasic, "_symlinks"), path) - // Verify that we got one symlink and with the correct attributes if len(files) != 1 { t.Errorf("expected 1 symlink, not %d", len(files)) } - if len(files[0].Blocks) != 0 { - t.Errorf("expected zero blocks for symlink, not %d", len(files[0].Blocks)) + if len(files[0].New.Blocks) != 0 { + t.Errorf("expected zero blocks for symlink, not %d", len(files[0].New.Blocks)) } - if files[0].SymlinkTarget != "../testdata" { - t.Errorf("expected symlink to have target destination, not %q", files[0].SymlinkTarget) + if files[0].New.SymlinkTarget != "../testdata" { + t.Errorf("expected symlink to have target destination, not %q", files[0].New.SymlinkTarget) } } } @@ -342,7 +341,7 @@ func TestWalkRootSymlink(t *testing.T) { } } -func walkDir(fs fs.Filesystem, dir string) ([]protocol.FileInfo, error) { +func walkDir(fs fs.Filesystem, dir string) ([]ScanResult, error) { fchan := Walk(context.TODO(), Config{ Filesystem: fs, Subs: []string{dir}, @@ -351,7 +350,7 @@ func walkDir(fs fs.Filesystem, dir string) ([]protocol.FileInfo, error) { Hashers: 2, }) - var tmp []protocol.FileInfo + var tmp []ScanResult for f := range fchan { tmp = append(tmp, f) } @@ -360,14 +359,14 @@ func walkDir(fs fs.Filesystem, dir string) ([]protocol.FileInfo, error) { return tmp, nil } -type fileList []protocol.FileInfo +type fileList []ScanResult func (l fileList) Len() int { return len(l) } func (l fileList) Less(a, b int) bool { - return l[a].Name < l[b].Name + return l[a].New.Name < l[b].New.Name } func (l fileList) Swap(a, b int) { @@ -377,12 +376,12 @@ func (l fileList) Swap(a, b int) { func (l fileList) testfiles() testfileList { testfiles := make(testfileList, len(l)) for i, f := range l { - if len(f.Blocks) > 1 { + if len(f.New.Blocks) > 1 { panic("simple test case stuff only supports a single block per file") } - testfiles[i] = testfile{name: f.Name, length: f.FileSize()} - if len(f.Blocks) == 1 { - testfiles[i].hash = fmt.Sprintf("%x", f.Blocks[0].Hash) + testfiles[i] = testfile{name: f.New.Name, length: f.New.FileSize()} + if len(f.New.Blocks) == 1 { + testfiles[i].hash = fmt.Sprintf("%x", f.New.Blocks[0].Hash) } } return testfiles @@ -465,13 +464,13 @@ func TestStopWalk(t *testing.T) { for { f := <-fchan t.Log("Scanned", f) - if f.IsDirectory() { - if len(f.Name) == 0 || f.Permissions == 0 { + if f.New.IsDirectory() { + if len(f.New.Name) == 0 || f.New.Permissions == 0 { t.Error("Bad directory entry", f) } dirs++ } else { - if len(f.Name) == 0 || len(f.Blocks) == 0 || f.Permissions == 0 { + if len(f.New.Name) == 0 || len(f.New.Blocks) == 0 || f.New.Permissions == 0 { t.Error("Bad file entry", f) } files++ @@ -529,3 +528,69 @@ func verify(r io.Reader, blocksize int, blocks []protocol.BlockInfo) error { return nil } + +// The following (randomish) scenario produced an error uncovered by integration tests +func TestWalkIntegration(t *testing.T) { + tmpDir, err := ioutil.TempDir(".", "_request-") + if err != nil { + panic("Failed to create temporary testing dir") + } + defer os.RemoveAll(tmpDir) + + fs := fs.NewFilesystem(fs.FilesystemTypeBasic, tmpDir) + fs.Mkdir("a", 0777) + toDel := filepath.Join("a", "b") + for _, f := range []string{"b", toDel} { + fi, err := fs.Create(f) + if err != nil { + panic(err) + } + fi.Close() + } + + conf := Config{ + Filesystem: fs, + BlockSize: 128 * 1024, + Hashers: 2, + } + + rchan := Walk(context.TODO(), conf) + + var res []ScanResult + for r := range rchan { + res = append(res, r) + } + sort.Sort(fileList(res)) + thw := make([]protocol.FileInfo, 0, len(res)) + for _, r := range res { + thw = append(thw, r.New) + } + conf.Have = testHaveWalker(thw) + + if err = fs.Remove(toDel); err != nil { + panic(err) + } + + rchan = Walk(context.TODO(), conf) + + for r := range rchan { + if r.New.Name != toDel { + t.Fatalf("Received unexpected result %v", r) + } + } +} + +type testHaveWalker []protocol.FileInfo + +func (thw testHaveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) { + if prefix != "" { + panic("cannot walk with prefix") + } + for _, f := range thw { + select { + case out <- f: + case <-ctx.Done(): + return + } + } +}