From 0f0290d574c43c4c4cf4a4cfe365ad6e6dfe0278 Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Wed, 4 Jul 2018 09:07:33 +0200 Subject: [PATCH] lib/model, lib/weakhash: Abort pulling quicker on folder stop (ref #5028) --- lib/model/folder.go | 9 ++- lib/model/folder_sendrecv.go | 113 +++++++++++++++++++++++++-------- lib/model/model.go | 16 +++-- lib/weakhash/benchmark_test.go | 3 +- lib/weakhash/weakhash.go | 13 +++- lib/weakhash/weakhash_test.go | 3 +- 6 files changed, 118 insertions(+), 39 deletions(-) diff --git a/lib/model/folder.go b/lib/model/folder.go index d29b049c3..fd425f598 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -206,8 +206,13 @@ func (f *folder) Scan(subdirs []string) error { subdirs: subdirs, err: make(chan error), } - f.scanNow <- req - return <-req.err + + select { + case f.scanNow <- req: + return <-req.err + case <-f.ctx.Done(): + return f.ctx.Err() + } } func (f *folder) Reschedule() { diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index 6070cc11d..88e14b087 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -159,6 +159,11 @@ func (f *sendReceiveFolder) pull() bool { scanChan := make(chan string) go f.pullScannerRoutine(scanChan) + defer func() { + close(scanChan) + f.setState(FolderIdle) + }() + var changed int tries := 0 @@ -166,6 +171,13 @@ func (f *sendReceiveFolder) pull() bool { tries++ changed = f.pullerIteration(curIgnores, ignoresChanged, scanChan) + + select { + case <-f.ctx.Done(): + return false + default: + } + l.Debugln(f, "changed", changed) if changed == 0 { @@ -189,10 +201,6 @@ func (f *sendReceiveFolder) pull() bool { } } - f.setState(FolderIdle) - - close(scanChan) - if changed == 0 { f.prevIgnoreHash = curIgnoreHash return true @@ -248,6 +256,32 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan doneWg.Done() }() + changed, fileDeletions, dirDeletions, err := f.processNeeded(ignores, dbUpdateChan, copyChan, finisherChan, scanChan) + + // Signal copy and puller routines that we are done with the in data for + // this iteration. Wait for them to finish. + close(copyChan) + copyWg.Wait() + close(pullChan) + pullWg.Wait() + + // Signal the finisher chan that there will be no more input and wait + // for it to finish. + close(finisherChan) + doneWg.Wait() + + if err == nil { + f.processDeletions(ignores, fileDeletions, dirDeletions, dbUpdateChan, scanChan) + } + + // Wait for db updates and scan scheduling to complete + close(dbUpdateChan) + updateWg.Wait() + + return changed +} + +func (f *sendReceiveFolder) processNeeded(ignores *ignore.Matcher, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) { f.model.fmut.RLock() folderFiles := f.model.folderFiles[f.folderID] f.model.fmut.RUnlock() @@ -260,6 +294,12 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan // (directories, symlinks and deletes) goes into the "process directly" // pile. folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool { + select { + case <-f.ctx.Done(): + return false + default: + } + if f.IgnoreDelete && intf.IsDeleted() { l.Debugln(f, "ignore file deletion (config)", intf.FileName()) return true @@ -311,6 +351,12 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan return true }) + select { + case <-f.ctx.Done(): + return changed, nil, nil, f.ctx.Err() + default: + } + // Sort the "process directly" pile by number of path components. This // ensures that we handle parents before children. @@ -323,6 +369,12 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan buckets := map[string][]protocol.FileInfo{} for _, fi := range processDirectly { + select { + case <-f.ctx.Done(): + return changed, fileDeletions, dirDeletions, f.ctx.Err() + default: + } + // Verify that the thing we are handling lives inside a directory, // and not a symlink or empty space. if err := osutil.TraversesSymlink(f.fs, filepath.Dir(fi.Name)); err != nil { @@ -389,8 +441,7 @@ nextFile: for { select { case <-f.ctx.Done(): - // Stop processing files if the puller has been told to stop. - break nextFile + return changed, fileDeletions, dirDeletions, f.ctx.Err() default: } @@ -468,35 +519,32 @@ nextFile: f.handleFile(fi, copyChan, finisherChan, dbUpdateChan) } - // Signal copy and puller routines that we are done with the in data for - // this iteration. Wait for them to finish. - close(copyChan) - copyWg.Wait() - close(pullChan) - pullWg.Wait() - - // Signal the finisher chan that there will be no more input. - close(finisherChan) - - // Wait for the finisherChan to finish. - doneWg.Wait() + return changed, fileDeletions, dirDeletions, nil +} +func (f *sendReceiveFolder) processDeletions(ignores *ignore.Matcher, fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) { for _, file := range fileDeletions { + select { + case <-f.ctx.Done(): + return + default: + } + l.Debugln(f, "Deleting file", file.Name) f.deleteFile(file, dbUpdateChan) } for i := range dirDeletions { + select { + case <-f.ctx.Done(): + return + default: + } + dir := dirDeletions[len(dirDeletions)-i-1] l.Debugln(f, "Deleting dir", dir.Name) f.handleDeleteDir(dir, ignores, dbUpdateChan, scanChan) } - - // Wait for db updates and scan scheduling to complete - close(dbUpdateChan) - updateWg.Wait() - - return changed } // handleDir creates or updates the given directory @@ -1097,7 +1145,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch if len(hashesToFind) > 0 { file, err = f.fs.Open(state.file.Name) if err == nil { - weakHashFinder, err = weakhash.NewFinder(file, int(state.file.BlockSize()), hashesToFind) + weakHashFinder, err = weakhash.NewFinder(f.ctx, file, int(state.file.BlockSize()), hashesToFind) if err != nil { l.Debugln("weak hasher", err) } @@ -1109,7 +1157,15 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch l.Debugf("not weak hashing %s. not enough changed %.02f < %d", state.file.Name, blocksPercentChanged, f.WeakHashThresholdPct) } + blocks: for _, block := range state.blocks { + select { + case <-f.ctx.Done(): + state.fail("folder stopped", f.ctx.Err()) + break blocks + default: + } + if !f.DisableSparseFiles && state.reused == 0 && block.IsEmpty() { // The block is a block of all zeroes, and we are not reusing // a temp file, so there is no need to do anything with it. @@ -1275,6 +1331,13 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu var lastError error candidates := f.model.Availability(f.folderID, state.file, state.block) for { + select { + case <-f.ctx.Done(): + state.fail("folder stopped", f.ctx.Err()) + return + default: + } + // Select the least busy device to pull the block from. If we found no // feasible device at all, fail the block (and in the long run, the // file). diff --git a/lib/model/model.go b/lib/model/model.go index a26f81e30..6ce39b1c8 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -341,6 +341,15 @@ func (m *Model) RemoveFolder(cfg config.FolderConfiguration) { } func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration) { + // Close connections to affected devices + // Must happen before stopping the folder service to abort ongoing + // transmissions and thus allow timely service termination. + for _, dev := range cfg.Devices { + if conn, ok := m.conn[dev.DeviceID]; ok { + closeRawConn(conn) + } + } + // Stop the services running for this folder and wait for them to finish // stopping to prevent races on restart. tokens := m.folderRunnerTokens[cfg.ID] @@ -352,13 +361,6 @@ func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration) { m.fmut.Lock() m.pmut.Lock() - // Close connections to affected devices - for _, dev := range cfg.Devices { - if conn, ok := m.conn[dev.DeviceID]; ok { - closeRawConn(conn) - } - } - // Clean up our config maps delete(m.folderCfgs, cfg.ID) delete(m.folderFiles, cfg.ID) diff --git a/lib/weakhash/benchmark_test.go b/lib/weakhash/benchmark_test.go index a067e9547..791900e4e 100644 --- a/lib/weakhash/benchmark_test.go +++ b/lib/weakhash/benchmark_test.go @@ -7,6 +7,7 @@ package weakhash import ( + "context" "os" "testing" @@ -28,7 +29,7 @@ func BenchmarkFind1MFile(b *testing.B) { if err != nil { b.Fatal(err) } - _, err = Find(fd, []uint32{0, 1, 2}, size) + _, err = Find(context.Background(), fd, []uint32{0, 1, 2}, size) if err != nil { b.Fatal(err) } diff --git a/lib/weakhash/weakhash.go b/lib/weakhash/weakhash.go index 58e6f0d6e..0ecce4c7b 100644 --- a/lib/weakhash/weakhash.go +++ b/lib/weakhash/weakhash.go @@ -8,6 +8,7 @@ package weakhash import ( "bufio" + "context" "io" "github.com/chmduquesne/rollinghash/adler32" @@ -23,7 +24,7 @@ const ( // Find finds all the blocks of the given size within io.Reader that matches // the hashes provided, and returns a hash -> slice of offsets within reader // map, that produces the same weak hash. -func Find(ir io.Reader, hashesToFind []uint32, size int) (map[uint32][]int64, error) { +func Find(ctx context.Context, ir io.Reader, hashesToFind []uint32, size int) (map[uint32][]int64, error) { if ir == nil || len(hashesToFind) == 0 { return nil, nil } @@ -50,6 +51,12 @@ func Find(ir io.Reader, hashesToFind []uint32, size int) (map[uint32][]int64, er var i int64 var hash uint32 for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + hash = hf.Sum32() if existing, ok := offsets[hash]; ok && len(existing) < maxWeakhashFinderHits { offsets[hash] = append(existing, i) @@ -67,8 +74,8 @@ func Find(ir io.Reader, hashesToFind []uint32, size int) (map[uint32][]int64, er return offsets, nil } -func NewFinder(ir io.ReadSeeker, size int, hashesToFind []uint32) (*Finder, error) { - offsets, err := Find(ir, hashesToFind, size) +func NewFinder(ctx context.Context, ir io.ReadSeeker, size int, hashesToFind []uint32) (*Finder, error) { + offsets, err := Find(ctx, ir, hashesToFind, size) if err != nil { return nil, err } diff --git a/lib/weakhash/weakhash_test.go b/lib/weakhash/weakhash_test.go index 0f3927595..0d9ca91c2 100644 --- a/lib/weakhash/weakhash_test.go +++ b/lib/weakhash/weakhash_test.go @@ -11,6 +11,7 @@ package weakhash import ( "bytes" + "context" "io" "io/ioutil" "os" @@ -36,7 +37,7 @@ func TestFinder(t *testing.T) { } hashes := []uint32{65143183, 65798547} - finder, err := NewFinder(f, 4, hashes) + finder, err := NewFinder(context.Background(), f, 4, hashes) if err != nil { t.Error(err) }