diff --git a/cmd/stfileinfo/main.go b/cmd/stfileinfo/main.go index 2264d50a5..4dda3ca7a 100644 --- a/cmd/stfileinfo/main.go +++ b/cmd/stfileinfo/main.go @@ -7,6 +7,7 @@ package main import ( + "context" "flag" "log" "os" @@ -70,7 +71,7 @@ func main() { if *standardBlocks || blockSize < protocol.BlockSize { blockSize = protocol.BlockSize } - bs, err := scanner.Blocks(fd, blockSize, fi.Size(), nil, true) + bs, err := scanner.Blocks(context.TODO(), fd, blockSize, fi.Size(), nil, true) if err != nil { log.Fatal(err) } diff --git a/cmd/syncthing/usage_report.go b/cmd/syncthing/usage_report.go index b8b0328ef..c43958015 100644 --- a/cmd/syncthing/usage_report.go +++ b/cmd/syncthing/usage_report.go @@ -8,6 +8,7 @@ package main import ( "bytes" + "context" "crypto/rand" "crypto/tls" "encoding/json" @@ -309,7 +310,7 @@ func cpuBenchOnce(duration time.Duration, useWeakHash bool, bs []byte) float64 { b := 0 for time.Since(t0) < duration { r := bytes.NewReader(bs) - blocksResult, _ = scanner.Blocks(r, protocol.BlockSize, int64(len(bs)), nil, useWeakHash) + blocksResult, _ = scanner.Blocks(context.TODO(), r, protocol.BlockSize, int64(len(bs)), nil, useWeakHash) b += len(bs) } d := time.Since(t0) diff --git a/lib/fs/basicfs.go b/lib/fs/basicfs.go index 2a0a04975..3e9626d5c 100644 --- a/lib/fs/basicfs.go +++ b/lib/fs/basicfs.go @@ -7,6 +7,7 @@ package fs import ( + "errors" "os" "time" ) @@ -86,6 +87,11 @@ func (f *BasicFilesystem) Create(name string) (File, error) { return fsFile{fd}, err } +func (f *BasicFilesystem) Walk(root string, walkFn WalkFunc) error { + // implemented in WalkFilesystem + return errors.New("not implemented") +} + // fsFile implements the fs.File interface on top of an os.File type fsFile struct { *os.File diff --git a/lib/fs/filesystem.go b/lib/fs/filesystem.go index b6e3905ff..025f911d3 100644 --- a/lib/fs/filesystem.go +++ b/lib/fs/filesystem.go @@ -64,7 +64,7 @@ const ModePerm = FileMode(os.ModePerm) // DefaultFilesystem is the fallback to use when nothing explicitly has // been passed. -var DefaultFilesystem Filesystem = NewBasicFilesystem() +var DefaultFilesystem Filesystem = NewWalkFilesystem(NewBasicFilesystem()) // SkipDir is used as a return value from WalkFuncs to indicate that // the directory named in the call is to be skipped. It is not returned diff --git a/lib/fs/basicfs_walk.go b/lib/fs/walkfs.go similarity index 90% rename from lib/fs/basicfs_walk.go rename to lib/fs/walkfs.go index af87173ce..e545cabb3 100644 --- a/lib/fs/basicfs_walk.go +++ b/lib/fs/walkfs.go @@ -28,8 +28,16 @@ import "path/filepath" // Walk skips the remaining files in the containing directory. type WalkFunc func(path string, info FileInfo, err error) error +type WalkFilesystem struct { + Filesystem +} + +func NewWalkFilesystem(next Filesystem) *WalkFilesystem { + return &WalkFilesystem{next} +} + // walk recursively descends path, calling walkFn. -func (f *BasicFilesystem) walk(path string, info FileInfo, walkFn WalkFunc) error { +func (f *WalkFilesystem) walk(path string, info FileInfo, walkFn WalkFunc) error { err := walkFn(path, info, nil) if err != nil { if info.IsDir() && err == SkipDir { @@ -72,7 +80,7 @@ func (f *BasicFilesystem) walk(path string, info FileInfo, walkFn WalkFunc) erro // order, which makes the output deterministic but means that for very // large directories Walk can be inefficient. // Walk does not follow symbolic links. -func (f *BasicFilesystem) Walk(root string, walkFn WalkFunc) error { +func (f *WalkFilesystem) Walk(root string, walkFn WalkFunc) error { info, err := f.Lstat(root) if err != nil { return walkFn(root, nil, err) diff --git a/lib/model/folder.go b/lib/model/folder.go index 8db0e1d1b..41c74f4ed 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -6,14 +6,18 @@ package model -import "time" +import ( + "context" + "time" +) type folder struct { stateTracker scan folderScanner model *Model - stop chan struct{} + ctx context.Context + cancel context.CancelFunc initialScanFinished chan struct{} } @@ -28,8 +32,9 @@ func (f *folder) Scan(subdirs []string) error { <-f.initialScanFinished return f.scan.Scan(subdirs) } + func (f *folder) Stop() { - close(f.stop) + f.cancel() } func (f *folder) Jobs() ([]string, []string) { @@ -39,7 +44,7 @@ func (f *folder) Jobs() ([]string, []string) { func (f *folder) BringToFront(string) {} func (f *folder) scanSubdirs(subDirs []string) error { - if err := f.model.internalScanFolderSubdirs(f.folderID, subDirs); err != nil { + if err := f.model.internalScanFolderSubdirs(f.ctx, f.folderID, subDirs); err != nil { // Potentially sets the error twice, once in the scanner just // by doing a check, and once here, if the error returned is // the same one as returned by CheckFolderHealth, though diff --git a/lib/model/model.go b/lib/model/model.go index 49b6e064b..dfc7a40c3 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -7,6 +7,7 @@ package model import ( + "context" "crypto/tls" "encoding/json" "errors" @@ -1715,7 +1716,7 @@ func (m *Model) ScanFolderSubdirs(folder string, subs []string) error { return runner.Scan(subs) } -func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error { +func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, subDirs []string) error { for i := 0; i < len(subDirs); i++ { sub := osutil.NativeFilename(subDirs[i]) @@ -1785,14 +1786,9 @@ func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error return ok }) - // The cancel channel is closed whenever we return (such as from an error), - // to signal the potentially still running walker to stop. - cancel := make(chan struct{}) - defer close(cancel) - runner.setState(FolderScanning) - fchan, err := scanner.Walk(scanner.Config{ + fchan, err := scanner.Walk(ctx, scanner.Config{ Folder: folderCfg.ID, Dir: folderCfg.Path(), Subs: subDirs, @@ -1806,7 +1802,6 @@ func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error Hashers: m.numHashers(folder), ShortID: m.shortID, ProgressTickIntervalS: folderCfg.ScanProgressIntervalS, - Cancel: cancel, UseWeakHashes: weakhash.Enabled, }) diff --git a/lib/model/model_test.go b/lib/model/model_test.go index a3b2a277a..e6fe48ab1 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -8,6 +8,7 @@ package model import ( "bytes" + "context" "encoding/json" "fmt" "io/ioutil" @@ -317,7 +318,7 @@ func (f *fakeConnection) addFile(name string, flags uint32, ftype protocol.FileI f.mut.Lock() defer f.mut.Unlock() - blocks, _ := scanner.Blocks(bytes.NewReader(data), protocol.BlockSize, int64(len(data)), nil, true) + blocks, _ := scanner.Blocks(context.TODO(), bytes.NewReader(data), protocol.BlockSize, int64(len(data)), nil, true) var version protocol.Vector version = version.Update(f.id.Short()) diff --git a/lib/model/rofolder.go b/lib/model/rofolder.go index 537595a30..50af80074 100644 --- a/lib/model/rofolder.go +++ b/lib/model/rofolder.go @@ -7,6 +7,7 @@ package model import ( + "context" "fmt" "github.com/syncthing/syncthing/lib/config" @@ -24,11 +25,14 @@ type sendOnlyFolder struct { } func newSendOnlyFolder(model *Model, cfg config.FolderConfiguration, _ versioner.Versioner, _ *fs.MtimeFS) service { + ctx, cancel := context.WithCancel(context.Background()) + return &sendOnlyFolder{ folder: folder{ stateTracker: newStateTracker(cfg.ID), scan: newFolderScanner(cfg), - stop: make(chan struct{}), + ctx: ctx, + cancel: cancel, model: model, initialScanFinished: make(chan struct{}), }, @@ -46,7 +50,7 @@ func (f *sendOnlyFolder) Serve() { for { select { - case <-f.stop: + case <-f.ctx.Done(): return case <-f.scan.timer.C: diff --git a/lib/model/rwfolder.go b/lib/model/rwfolder.go index 5df9bae15..344a81cbd 100644 --- a/lib/model/rwfolder.go +++ b/lib/model/rwfolder.go @@ -7,6 +7,7 @@ package model import ( + "context" "errors" "fmt" "math/rand" @@ -99,11 +100,14 @@ type sendReceiveFolder struct { } func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner, mtimeFS *fs.MtimeFS) service { + ctx, cancel := context.WithCancel(context.Background()) + f := &sendReceiveFolder{ folder: folder{ stateTracker: newStateTracker(cfg.ID), scan: newFolderScanner(cfg), - stop: make(chan struct{}), + ctx: ctx, + cancel: cancel, model: model, initialScanFinished: make(chan struct{}), }, @@ -171,7 +175,7 @@ func (f *sendReceiveFolder) Serve() { for { select { - case <-f.stop: + case <-f.ctx.Done(): return case <-f.remoteIndex: @@ -492,7 +496,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher) int { nextFile: for { select { - case <-f.stop: + case <-f.ctx.Done(): // Stop processing files if the puller has been told to stop. break nextFile default: @@ -1076,7 +1080,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c // Check for an old temporary file which might have some blocks we could // reuse. - tempBlocks, err := scanner.HashFile(fs.DefaultFilesystem, tempName, protocol.BlockSize, nil, false) + tempBlocks, err := scanner.HashFile(f.ctx, fs.DefaultFilesystem, tempName, protocol.BlockSize, nil, false) if err == nil { // Check for any reusable blocks in the temp file tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks) diff --git a/lib/model/rwfolder_test.go b/lib/model/rwfolder_test.go index 90830d17a..10ef435db 100644 --- a/lib/model/rwfolder_test.go +++ b/lib/model/rwfolder_test.go @@ -7,6 +7,7 @@ package model import ( + "context" "crypto/rand" "io" "os" @@ -83,6 +84,7 @@ func setUpSendReceiveFolder(model *Model) *sendReceiveFolder { stateTracker: newStateTracker("default"), model: model, initialScanFinished: make(chan struct{}), + ctx: context.TODO(), }, mtimeFS: fs.NewMtimeFS(fs.DefaultFilesystem, db.NewNamespacedKV(model.db, "mtime")), @@ -244,7 +246,7 @@ func TestCopierFinder(t *testing.T) { } // Verify that the fetched blocks have actually been written to the temp file - blks, err := scanner.HashFile(fs.DefaultFilesystem, tempFile, protocol.BlockSize, nil, false) + blks, err := scanner.HashFile(context.TODO(), fs.DefaultFilesystem, tempFile, protocol.BlockSize, nil, false) if err != nil { t.Log(err) } @@ -297,7 +299,7 @@ func TestWeakHash(t *testing.T) { // File 1: abcdefgh // File 2: xyabcdef f.Seek(0, os.SEEK_SET) - existing, err := scanner.Blocks(f, protocol.BlockSize, size, nil, true) + existing, err := scanner.Blocks(context.TODO(), f, protocol.BlockSize, size, nil, true) if err != nil { t.Error(err) } @@ -306,7 +308,7 @@ func TestWeakHash(t *testing.T) { remainder := io.LimitReader(f, size-shift) prefix := io.LimitReader(rand.Reader, shift) nf := io.MultiReader(prefix, remainder) - desired, err := scanner.Blocks(nf, protocol.BlockSize, size, nil, true) + desired, err := scanner.Blocks(context.TODO(), nf, protocol.BlockSize, size, nil, true) if err != nil { t.Error(err) } diff --git a/lib/scanner/blockqueue.go b/lib/scanner/blockqueue.go index 5e0582709..f164c9f17 100644 --- a/lib/scanner/blockqueue.go +++ b/lib/scanner/blockqueue.go @@ -7,6 +7,7 @@ package scanner import ( + "context" "errors" "path/filepath" @@ -16,7 +17,7 @@ import ( ) // HashFile hashes the files and returns a list of blocks representing the file. -func HashFile(fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) { +func HashFile(ctx context.Context, fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) { fd, err := fs.Open(path) if err != nil { l.Debugln("open:", err) @@ -36,7 +37,7 @@ func HashFile(fs fs.Filesystem, path string, blockSize int, counter Counter, use // Hash the file. This may take a while for large files. - blocks, err := Blocks(fd, blockSize, size, counter, useWeakHashes) + blocks, err := Blocks(ctx, fd, blockSize, size, counter, useWeakHashes) if err != nil { l.Debugln("blocks:", err) return nil, err @@ -70,12 +71,11 @@ type parallelHasher struct { inbox <-chan protocol.FileInfo counter Counter done chan<- struct{} - cancel <-chan struct{} useWeakHashes bool wg sync.WaitGroup } -func newParallelHasher(fs fs.Filesystem, dir string, blockSize, workers int, outbox chan<- protocol.FileInfo, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}, cancel <-chan struct{}, useWeakHashes bool) { +func newParallelHasher(ctx context.Context, fs fs.Filesystem, dir string, blockSize, workers int, outbox chan<- protocol.FileInfo, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}, useWeakHashes bool) { ph := ¶llelHasher{ fs: fs, dir: dir, @@ -85,20 +85,19 @@ func newParallelHasher(fs fs.Filesystem, dir string, blockSize, workers int, out inbox: inbox, counter: counter, done: done, - cancel: cancel, useWeakHashes: useWeakHashes, wg: sync.NewWaitGroup(), } for i := 0; i < workers; i++ { ph.wg.Add(1) - go ph.hashFiles() + go ph.hashFiles(ctx) } go ph.closeWhenDone() } -func (ph *parallelHasher) hashFiles() { +func (ph *parallelHasher) hashFiles(ctx context.Context) { defer ph.wg.Done() for { @@ -112,7 +111,7 @@ func (ph *parallelHasher) hashFiles() { panic("Bug. Asked to hash a directory or a deleted file.") } - blocks, err := HashFile(ph.fs, filepath.Join(ph.dir, f.Name), ph.blockSize, ph.counter, ph.useWeakHashes) + blocks, err := HashFile(ctx, ph.fs, filepath.Join(ph.dir, f.Name), ph.blockSize, ph.counter, ph.useWeakHashes) if err != nil { l.Debugln("hash error:", f.Name, err) continue @@ -131,11 +130,11 @@ func (ph *parallelHasher) hashFiles() { select { case ph.outbox <- f: - case <-ph.cancel: + case <-ctx.Done(): return } - case <-ph.cancel: + case <-ctx.Done(): return } } diff --git a/lib/scanner/blocks.go b/lib/scanner/blocks.go index 24ca7b6f1..1a08daf31 100644 --- a/lib/scanner/blocks.go +++ b/lib/scanner/blocks.go @@ -8,6 +8,7 @@ package scanner import ( "bytes" + "context" "fmt" "hash" "io" @@ -24,7 +25,7 @@ type Counter interface { } // Blocks returns the blockwise hash of the reader. -func Blocks(r io.Reader, blocksize int, sizehint int64, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) { +func Blocks(ctx context.Context, r io.Reader, blocksize int, sizehint int64, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) { hf := sha256.New() hashLength := hf.Size() @@ -57,6 +58,12 @@ func Blocks(r io.Reader, blocksize int, sizehint int64, counter Counter, useWeak var offset int64 lr := io.LimitReader(r, int64(blocksize)).(*io.LimitedReader) for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + lr.N = int64(blocksize) n, err := io.CopyBuffer(mhf, lr, buf) if err != nil { diff --git a/lib/scanner/blocks_test.go b/lib/scanner/blocks_test.go index 68424d365..9dab8b6b0 100644 --- a/lib/scanner/blocks_test.go +++ b/lib/scanner/blocks_test.go @@ -8,6 +8,7 @@ package scanner import ( "bytes" + "context" "crypto/rand" "fmt" origAdler32 "hash/adler32" @@ -68,7 +69,7 @@ var blocksTestData = []struct { func TestBlocks(t *testing.T) { for testNo, test := range blocksTestData { buf := bytes.NewBuffer(test.data) - blocks, err := Blocks(buf, test.blocksize, -1, nil, true) + blocks, err := Blocks(context.TODO(), buf, test.blocksize, -1, nil, true) if err != nil { t.Fatal(err) @@ -125,8 +126,8 @@ var diffTestData = []struct { func TestDiff(t *testing.T) { for i, test := range diffTestData { - a, _ := Blocks(bytes.NewBufferString(test.a), test.s, -1, nil, false) - b, _ := Blocks(bytes.NewBufferString(test.b), test.s, -1, nil, false) + a, _ := Blocks(context.TODO(), bytes.NewBufferString(test.a), test.s, -1, nil, false) + b, _ := Blocks(context.TODO(), bytes.NewBufferString(test.b), test.s, -1, nil, false) _, d := BlockDiff(a, b) if len(d) != len(test.d) { t.Fatalf("Incorrect length for diff %d; %d != %d", i, len(d), len(test.d)) diff --git a/lib/scanner/infinitefs_test.go b/lib/scanner/infinitefs_test.go new file mode 100644 index 000000000..4976c464f --- /dev/null +++ b/lib/scanner/infinitefs_test.go @@ -0,0 +1,103 @@ +// Copyright (C) 2017 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package scanner + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + "github.com/syncthing/syncthing/lib/fs" +) + +type infiniteFS struct { + width int // number of files and directories per level + depth int // number of tree levels to simulate + filesize int64 // size of each file in bytes +} + +var errNotSupp = errors.New("not supported") + +func (i infiniteFS) Lstat(name string) (fs.FileInfo, error) { + return fakeInfo{name, i.filesize}, nil +} + +func (i infiniteFS) DirNames(name string) ([]string, error) { + // Returns a list of fake files and directories. Names are such that + // files appear before directories - this makes it so the scanner will + // actually see a few files without having to reach the max depth. + var names []string + for j := 0; j < i.width; j++ { + names = append(names, fmt.Sprintf("aa-file-%d", j)) + } + if len(strings.Split(name, string(os.PathSeparator))) < i.depth { + for j := 0; j < i.width; j++ { + names = append(names, fmt.Sprintf("zz-dir-%d", j)) + } + } + return names, nil +} + +func (i infiniteFS) Open(name string) (fs.File, error) { + return &fakeFile{name, i.filesize, 0}, nil +} + +func (infiniteFS) Chmod(name string, mode fs.FileMode) error { return errNotSupp } +func (infiniteFS) Chtimes(name string, atime time.Time, mtime time.Time) error { return errNotSupp } +func (infiniteFS) Create(name string) (fs.File, error) { return nil, errNotSupp } +func (infiniteFS) CreateSymlink(name, target string) error { return errNotSupp } +func (infiniteFS) Mkdir(name string, perm fs.FileMode) error { return errNotSupp } +func (infiniteFS) ReadSymlink(name string) (string, error) { return "", errNotSupp } +func (infiniteFS) Remove(name string) error { return errNotSupp } +func (infiniteFS) Rename(oldname, newname string) error { return errNotSupp } +func (infiniteFS) Stat(name string) (fs.FileInfo, error) { return nil, errNotSupp } +func (infiniteFS) SymlinksSupported() bool { return false } +func (infiniteFS) Walk(root string, walkFn fs.WalkFunc) error { return errNotSupp } + +type fakeInfo struct { + name string + size int64 +} + +func (f fakeInfo) Name() string { return f.name } +func (f fakeInfo) Mode() fs.FileMode { return 0755 } +func (f fakeInfo) Size() int64 { return f.size } +func (f fakeInfo) ModTime() time.Time { return time.Unix(1234567890, 0) } +func (f fakeInfo) IsDir() bool { return strings.Contains(filepath.Base(f.name), "dir") } +func (f fakeInfo) IsRegular() bool { return !f.IsDir() } +func (f fakeInfo) IsSymlink() bool { return false } + +type fakeFile struct { + name string + size int64 + readOffset int64 +} + +func (f *fakeFile) Read(bs []byte) (int, error) { + remaining := f.size - f.readOffset + if remaining == 0 { + return 0, io.EOF + } + if remaining < int64(len(bs)) { + f.readOffset = f.size + return int(remaining), nil + } + f.readOffset += int64(len(bs)) + return len(bs), nil +} + +func (f *fakeFile) Stat() (fs.FileInfo, error) { + return fakeInfo{f.name, f.size}, nil +} + +func (f *fakeFile) WriteAt(bs []byte, offs int64) (int, error) { return 0, errNotSupp } +func (f *fakeFile) Close() error { return nil } +func (f *fakeFile) Truncate(size int64) error { return errNotSupp } diff --git a/lib/scanner/walk.go b/lib/scanner/walk.go index c13522f39..1b7996891 100644 --- a/lib/scanner/walk.go +++ b/lib/scanner/walk.go @@ -7,6 +7,7 @@ package scanner import ( + "context" "errors" "path/filepath" "runtime" @@ -69,8 +70,6 @@ type Config struct { // Optional progress tick interval which defines how often FolderScanProgress // events are emitted. Negative number means disabled. ProgressTickIntervalS int - // Signals cancel from the outside - when closed, we should stop walking. - Cancel chan struct{} // Whether or not we should also compute weak hashes UseWeakHashes bool } @@ -80,7 +79,7 @@ type CurrentFiler interface { CurrentFile(name string) (protocol.FileInfo, bool) } -func Walk(cfg Config) (chan protocol.FileInfo, error) { +func Walk(ctx context.Context, cfg Config) (chan protocol.FileInfo, error) { w := walker{cfg} if w.CurrentFiler == nil { @@ -90,7 +89,7 @@ func Walk(cfg Config) (chan protocol.FileInfo, error) { w.Filesystem = fs.DefaultFilesystem } - return w.walk() + return w.walk(ctx) } type walker struct { @@ -99,7 +98,7 @@ type walker struct { // Walk returns the list of files found in the local folder by scanning the // file system. Files are blockwise hashed. -func (w *walker) walk() (chan protocol.FileInfo, error) { +func (w *walker) walk(ctx context.Context) (chan protocol.FileInfo, error) { l.Debugln("Walk", w.Dir, w.Subs, w.BlockSize, w.Matcher) if err := w.checkDir(); err != nil { @@ -112,7 +111,7 @@ func (w *walker) walk() (chan protocol.FileInfo, error) { // A routine which walks the filesystem tree, and sends files which have // been modified to the counter routine. go func() { - hashFiles := w.walkAndHashFiles(toHashChan, finishedChan) + hashFiles := w.walkAndHashFiles(ctx, toHashChan, finishedChan) if len(w.Subs) == 0 { w.Filesystem.Walk(w.Dir, hashFiles) } else { @@ -126,7 +125,7 @@ func (w *walker) walk() (chan protocol.FileInfo, error) { // We're not required to emit scan progress events, just kick off hashers, // and feed inputs directly from the walker. if w.ProgressTickIntervalS < 0 { - newParallelHasher(w.Filesystem, w.Dir, w.BlockSize, w.Hashers, finishedChan, toHashChan, nil, nil, w.Cancel, w.UseWeakHashes) + newParallelHasher(ctx, w.Filesystem, w.Dir, w.BlockSize, w.Hashers, finishedChan, toHashChan, nil, nil, w.UseWeakHashes) return finishedChan, nil } @@ -157,7 +156,7 @@ func (w *walker) walk() (chan protocol.FileInfo, error) { done := make(chan struct{}) progress := newByteCounter() - newParallelHasher(w.Filesystem, w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, progress, done, w.Cancel, w.UseWeakHashes) + newParallelHasher(ctx, w.Filesystem, w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, progress, done, w.UseWeakHashes) // A routine which actually emits the FolderScanProgress events // every w.ProgressTicker ticks, until the hasher routines terminate. @@ -180,7 +179,7 @@ func (w *walker) walk() (chan protocol.FileInfo, error) { "total": total, "rate": rate, // bytes per second }) - case <-w.Cancel: + case <-ctx.Done(): ticker.Stop() return } @@ -192,7 +191,7 @@ func (w *walker) walk() (chan protocol.FileInfo, error) { l.Debugln("real to hash:", file.Name) select { case realToHashChan <- file: - case <-w.Cancel: + case <-ctx.Done(): break loop } } @@ -202,9 +201,15 @@ func (w *walker) walk() (chan protocol.FileInfo, error) { return finishedChan, nil } -func (w *walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) fs.WalkFunc { +func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protocol.FileInfo) fs.WalkFunc { now := time.Now() return func(absPath string, info fs.FileInfo, err error) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Return value used when we are returning early and don't want to // process the item. For directories, this means do-not-descend. var skip error // nil @@ -265,7 +270,7 @@ func (w *walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) fs.WalkFu switch { case info.IsSymlink(): - if err := w.walkSymlink(absPath, relPath, dchan); err != nil { + if err := w.walkSymlink(ctx, absPath, relPath, dchan); err != nil { return err } if info.IsDir() { @@ -275,17 +280,17 @@ func (w *walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) fs.WalkFu return nil case info.IsDir(): - err = w.walkDir(relPath, info, dchan) + err = w.walkDir(ctx, relPath, info, dchan) case info.IsRegular(): - err = w.walkRegular(relPath, info, fchan) + err = w.walkRegular(ctx, relPath, info, fchan) } return err } } -func (w *walker) walkRegular(relPath string, info fs.FileInfo, fchan chan protocol.FileInfo) error { +func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, fchan chan protocol.FileInfo) error { curMode := uint32(info.Mode()) if runtime.GOOS == "windows" && osutil.IsWindowsExecutable(relPath) { curMode |= 0111 @@ -326,14 +331,14 @@ func (w *walker) walkRegular(relPath string, info fs.FileInfo, fchan chan protoc select { case fchan <- f: - case <-w.Cancel: - return errors.New("cancelled") + case <-ctx.Done(): + return ctx.Err() } return nil } -func (w *walker) walkDir(relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error { +func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error { // A directory is "unchanged", if it // - exists // - has the same permissions as previously, unless we are ignoring permissions @@ -361,8 +366,8 @@ func (w *walker) walkDir(relPath string, info fs.FileInfo, dchan chan protocol.F select { case dchan <- f: - case <-w.Cancel: - return errors.New("cancelled") + case <-ctx.Done(): + return ctx.Err() } return nil @@ -370,7 +375,7 @@ func (w *walker) walkDir(relPath string, info fs.FileInfo, dchan chan protocol.F // walkSymlink returns nil or an error, if the error is of the nature that // it should stop the entire walk. -func (w *walker) walkSymlink(absPath, relPath string, dchan chan protocol.FileInfo) error { +func (w *walker) walkSymlink(ctx context.Context, absPath, relPath string, dchan chan protocol.FileInfo) error { // Symlinks are not supported on Windows. We ignore instead of returning // an error. if runtime.GOOS == "windows" { @@ -412,8 +417,8 @@ func (w *walker) walkSymlink(absPath, relPath string, dchan chan protocol.FileIn select { case dchan <- f: - case <-w.Cancel: - return errors.New("cancelled") + case <-ctx.Done(): + return ctx.Err() } return nil diff --git a/lib/scanner/walk_test.go b/lib/scanner/walk_test.go index 16de7c045..725113ee5 100644 --- a/lib/scanner/walk_test.go +++ b/lib/scanner/walk_test.go @@ -8,6 +8,7 @@ package scanner import ( "bytes" + "context" "crypto/rand" "fmt" "io" @@ -59,7 +60,7 @@ func TestWalkSub(t *testing.T) { t.Fatal(err) } - fchan, err := Walk(Config{ + fchan, err := Walk(context.TODO(), Config{ Dir: "testdata", Subs: []string{"dir2"}, BlockSize: 128 * 1024, @@ -96,7 +97,7 @@ func TestWalk(t *testing.T) { } t.Log(ignores) - fchan, err := Walk(Config{ + fchan, err := Walk(context.TODO(), Config{ Dir: "testdata", BlockSize: 128 * 1024, Matcher: ignores, @@ -120,7 +121,7 @@ func TestWalk(t *testing.T) { } func TestWalkError(t *testing.T) { - _, err := Walk(Config{ + _, err := Walk(context.TODO(), Config{ Dir: "testdata-missing", BlockSize: 128 * 1024, Hashers: 2, @@ -130,7 +131,7 @@ func TestWalkError(t *testing.T) { t.Error("no error from missing directory") } - _, err = Walk(Config{ + _, err = Walk(context.TODO(), Config{ Dir: "testdata/bar", BlockSize: 128 * 1024, }) @@ -148,7 +149,7 @@ func TestVerify(t *testing.T) { progress := newByteCounter() defer progress.Close() - blocks, err := Blocks(buf, blocksize, -1, progress, false) + blocks, err := Blocks(context.TODO(), buf, blocksize, -1, progress, false) if err != nil { t.Fatal(err) } @@ -276,7 +277,7 @@ func TestNormalization(t *testing.T) { func TestIssue1507(t *testing.T) { w := &walker{} c := make(chan protocol.FileInfo, 100) - fn := w.walkAndHashFiles(c, c) + fn := w.walkAndHashFiles(context.TODO(), c, c) fn("", nil, protocol.ErrClosed) } @@ -297,7 +298,7 @@ func TestWalkSymlinkUnix(t *testing.T) { // Scan it - fchan, err := Walk(Config{ + fchan, err := Walk(context.TODO(), Config{ Dir: "_symlinks", BlockSize: 128 * 1024, }) @@ -342,7 +343,7 @@ func TestWalkSymlinkWindows(t *testing.T) { // Scan it - fchan, err := Walk(Config{ + fchan, err := Walk(context.TODO(), Config{ Dir: "_symlinks", BlockSize: 128 * 1024, }) @@ -364,7 +365,7 @@ func TestWalkSymlinkWindows(t *testing.T) { } func walkDir(dir string) ([]protocol.FileInfo, error) { - fchan, err := Walk(Config{ + fchan, err := Walk(context.TODO(), Config{ Dir: dir, BlockSize: 128 * 1024, AutoNormalize: true, @@ -434,7 +435,7 @@ func BenchmarkHashFile(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - if _, err := HashFile(fs.DefaultFilesystem, testdataName, protocol.BlockSize, nil, true); err != nil { + if _, err := HashFile(context.TODO(), fs.DefaultFilesystem, testdataName, protocol.BlockSize, nil, true); err != nil { b.Fatal(err) } } @@ -458,3 +459,68 @@ func initTestFile() { panic(err) } } + +func TestStopWalk(t *testing.T) { + // Create tree that is 100 levels deep, with each level containing 100 + // files (each 1 MB) and 100 directories (in turn containing 100 files + // and 100 directories, etc). That is, in total > 100^100 files and as + // many directories. It'll take a while to scan, giving us time to + // cancel it and make sure the scan stops. + + fs := fs.NewWalkFilesystem(&infiniteFS{100, 100, 1e6}) + + const numHashers = 4 + ctx, cancel := context.WithCancel(context.Background()) + fchan, err := Walk(ctx, Config{ + Dir: "testdir", + BlockSize: 128 * 1024, + Hashers: numHashers, + Filesystem: fs, + ProgressTickIntervalS: -1, // Don't attempt to build the full list of files before starting to scan... + }) + + if err != nil { + t.Fatal(err) + } + + // Receive a few entries to make sure the walker is up and running, + // scanning both files and dirs. Do some quick sanity tests on the + // returned file entries to make sure we are not just reading crap from + // a closed channel or something. + dirs := 0 + files := 0 + for { + f := <-fchan + t.Log("Scanned", f) + if f.IsDirectory() { + if len(f.Name) == 0 || f.Permissions == 0 { + t.Error("Bad directory entry", f) + } + dirs++ + } else { + if len(f.Name) == 0 || len(f.Blocks) == 0 || f.Permissions == 0 { + t.Error("Bad file entry", f) + } + files++ + } + if dirs > 5 && files > 5 { + break + } + } + + // Cancel the walker. + cancel() + + // Empty out any waiting entries and wait for the channel to close. + // Count them, they should be zero or very few - essentially, each + // hasher has the choice of returning a fully handled entry or + // cancelling, but they should not start on another item. + extra := 0 + for range fchan { + extra++ + } + t.Log("Extra entries:", extra) + if extra > numHashers { + t.Error("unexpected extra entries received after cancel") + } +}