lib/model, lib/weakhash: Abort pulling quicker on folder stop (ref #5028)

This commit is contained in:
Simon Frei 2018-07-04 09:07:33 +02:00 committed by Audrius Butkevicius
parent 5bb72dfe5d
commit 0f0290d574
6 changed files with 118 additions and 39 deletions

View File

@ -206,8 +206,13 @@ func (f *folder) Scan(subdirs []string) error {
subdirs: subdirs, subdirs: subdirs,
err: make(chan error), err: make(chan error),
} }
f.scanNow <- req
return <-req.err select {
case f.scanNow <- req:
return <-req.err
case <-f.ctx.Done():
return f.ctx.Err()
}
} }
func (f *folder) Reschedule() { func (f *folder) Reschedule() {

View File

@ -159,6 +159,11 @@ func (f *sendReceiveFolder) pull() bool {
scanChan := make(chan string) scanChan := make(chan string)
go f.pullScannerRoutine(scanChan) go f.pullScannerRoutine(scanChan)
defer func() {
close(scanChan)
f.setState(FolderIdle)
}()
var changed int var changed int
tries := 0 tries := 0
@ -166,6 +171,13 @@ func (f *sendReceiveFolder) pull() bool {
tries++ tries++
changed = f.pullerIteration(curIgnores, ignoresChanged, scanChan) changed = f.pullerIteration(curIgnores, ignoresChanged, scanChan)
select {
case <-f.ctx.Done():
return false
default:
}
l.Debugln(f, "changed", changed) l.Debugln(f, "changed", changed)
if changed == 0 { if changed == 0 {
@ -189,10 +201,6 @@ func (f *sendReceiveFolder) pull() bool {
} }
} }
f.setState(FolderIdle)
close(scanChan)
if changed == 0 { if changed == 0 {
f.prevIgnoreHash = curIgnoreHash f.prevIgnoreHash = curIgnoreHash
return true return true
@ -248,6 +256,32 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
doneWg.Done() doneWg.Done()
}() }()
changed, fileDeletions, dirDeletions, err := f.processNeeded(ignores, dbUpdateChan, copyChan, finisherChan, scanChan)
// Signal copy and puller routines that we are done with the in data for
// this iteration. Wait for them to finish.
close(copyChan)
copyWg.Wait()
close(pullChan)
pullWg.Wait()
// Signal the finisher chan that there will be no more input and wait
// for it to finish.
close(finisherChan)
doneWg.Wait()
if err == nil {
f.processDeletions(ignores, fileDeletions, dirDeletions, dbUpdateChan, scanChan)
}
// Wait for db updates and scan scheduling to complete
close(dbUpdateChan)
updateWg.Wait()
return changed
}
func (f *sendReceiveFolder) processNeeded(ignores *ignore.Matcher, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) {
f.model.fmut.RLock() f.model.fmut.RLock()
folderFiles := f.model.folderFiles[f.folderID] folderFiles := f.model.folderFiles[f.folderID]
f.model.fmut.RUnlock() f.model.fmut.RUnlock()
@ -260,6 +294,12 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
// (directories, symlinks and deletes) goes into the "process directly" // (directories, symlinks and deletes) goes into the "process directly"
// pile. // pile.
folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool { folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
select {
case <-f.ctx.Done():
return false
default:
}
if f.IgnoreDelete && intf.IsDeleted() { if f.IgnoreDelete && intf.IsDeleted() {
l.Debugln(f, "ignore file deletion (config)", intf.FileName()) l.Debugln(f, "ignore file deletion (config)", intf.FileName())
return true return true
@ -311,6 +351,12 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
return true return true
}) })
select {
case <-f.ctx.Done():
return changed, nil, nil, f.ctx.Err()
default:
}
// Sort the "process directly" pile by number of path components. This // Sort the "process directly" pile by number of path components. This
// ensures that we handle parents before children. // ensures that we handle parents before children.
@ -323,6 +369,12 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
buckets := map[string][]protocol.FileInfo{} buckets := map[string][]protocol.FileInfo{}
for _, fi := range processDirectly { for _, fi := range processDirectly {
select {
case <-f.ctx.Done():
return changed, fileDeletions, dirDeletions, f.ctx.Err()
default:
}
// Verify that the thing we are handling lives inside a directory, // Verify that the thing we are handling lives inside a directory,
// and not a symlink or empty space. // and not a symlink or empty space.
if err := osutil.TraversesSymlink(f.fs, filepath.Dir(fi.Name)); err != nil { if err := osutil.TraversesSymlink(f.fs, filepath.Dir(fi.Name)); err != nil {
@ -389,8 +441,7 @@ nextFile:
for { for {
select { select {
case <-f.ctx.Done(): case <-f.ctx.Done():
// Stop processing files if the puller has been told to stop. return changed, fileDeletions, dirDeletions, f.ctx.Err()
break nextFile
default: default:
} }
@ -468,35 +519,32 @@ nextFile:
f.handleFile(fi, copyChan, finisherChan, dbUpdateChan) f.handleFile(fi, copyChan, finisherChan, dbUpdateChan)
} }
// Signal copy and puller routines that we are done with the in data for return changed, fileDeletions, dirDeletions, nil
// this iteration. Wait for them to finish. }
close(copyChan)
copyWg.Wait()
close(pullChan)
pullWg.Wait()
// Signal the finisher chan that there will be no more input.
close(finisherChan)
// Wait for the finisherChan to finish.
doneWg.Wait()
func (f *sendReceiveFolder) processDeletions(ignores *ignore.Matcher, fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
for _, file := range fileDeletions { for _, file := range fileDeletions {
select {
case <-f.ctx.Done():
return
default:
}
l.Debugln(f, "Deleting file", file.Name) l.Debugln(f, "Deleting file", file.Name)
f.deleteFile(file, dbUpdateChan) f.deleteFile(file, dbUpdateChan)
} }
for i := range dirDeletions { for i := range dirDeletions {
select {
case <-f.ctx.Done():
return
default:
}
dir := dirDeletions[len(dirDeletions)-i-1] dir := dirDeletions[len(dirDeletions)-i-1]
l.Debugln(f, "Deleting dir", dir.Name) l.Debugln(f, "Deleting dir", dir.Name)
f.handleDeleteDir(dir, ignores, dbUpdateChan, scanChan) f.handleDeleteDir(dir, ignores, dbUpdateChan, scanChan)
} }
// Wait for db updates and scan scheduling to complete
close(dbUpdateChan)
updateWg.Wait()
return changed
} }
// handleDir creates or updates the given directory // handleDir creates or updates the given directory
@ -1097,7 +1145,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
if len(hashesToFind) > 0 { if len(hashesToFind) > 0 {
file, err = f.fs.Open(state.file.Name) file, err = f.fs.Open(state.file.Name)
if err == nil { if err == nil {
weakHashFinder, err = weakhash.NewFinder(file, int(state.file.BlockSize()), hashesToFind) weakHashFinder, err = weakhash.NewFinder(f.ctx, file, int(state.file.BlockSize()), hashesToFind)
if err != nil { if err != nil {
l.Debugln("weak hasher", err) l.Debugln("weak hasher", err)
} }
@ -1109,7 +1157,15 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
l.Debugf("not weak hashing %s. not enough changed %.02f < %d", state.file.Name, blocksPercentChanged, f.WeakHashThresholdPct) l.Debugf("not weak hashing %s. not enough changed %.02f < %d", state.file.Name, blocksPercentChanged, f.WeakHashThresholdPct)
} }
blocks:
for _, block := range state.blocks { for _, block := range state.blocks {
select {
case <-f.ctx.Done():
state.fail("folder stopped", f.ctx.Err())
break blocks
default:
}
if !f.DisableSparseFiles && state.reused == 0 && block.IsEmpty() { if !f.DisableSparseFiles && state.reused == 0 && block.IsEmpty() {
// The block is a block of all zeroes, and we are not reusing // The block is a block of all zeroes, and we are not reusing
// a temp file, so there is no need to do anything with it. // a temp file, so there is no need to do anything with it.
@ -1275,6 +1331,13 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu
var lastError error var lastError error
candidates := f.model.Availability(f.folderID, state.file, state.block) candidates := f.model.Availability(f.folderID, state.file, state.block)
for { for {
select {
case <-f.ctx.Done():
state.fail("folder stopped", f.ctx.Err())
return
default:
}
// Select the least busy device to pull the block from. If we found no // Select the least busy device to pull the block from. If we found no
// feasible device at all, fail the block (and in the long run, the // feasible device at all, fail the block (and in the long run, the
// file). // file).

View File

@ -341,6 +341,15 @@ func (m *Model) RemoveFolder(cfg config.FolderConfiguration) {
} }
func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration) { func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration) {
// Close connections to affected devices
// Must happen before stopping the folder service to abort ongoing
// transmissions and thus allow timely service termination.
for _, dev := range cfg.Devices {
if conn, ok := m.conn[dev.DeviceID]; ok {
closeRawConn(conn)
}
}
// Stop the services running for this folder and wait for them to finish // Stop the services running for this folder and wait for them to finish
// stopping to prevent races on restart. // stopping to prevent races on restart.
tokens := m.folderRunnerTokens[cfg.ID] tokens := m.folderRunnerTokens[cfg.ID]
@ -352,13 +361,6 @@ func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration) {
m.fmut.Lock() m.fmut.Lock()
m.pmut.Lock() m.pmut.Lock()
// Close connections to affected devices
for _, dev := range cfg.Devices {
if conn, ok := m.conn[dev.DeviceID]; ok {
closeRawConn(conn)
}
}
// Clean up our config maps // Clean up our config maps
delete(m.folderCfgs, cfg.ID) delete(m.folderCfgs, cfg.ID)
delete(m.folderFiles, cfg.ID) delete(m.folderFiles, cfg.ID)

View File

@ -7,6 +7,7 @@
package weakhash package weakhash
import ( import (
"context"
"os" "os"
"testing" "testing"
@ -28,7 +29,7 @@ func BenchmarkFind1MFile(b *testing.B) {
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
_, err = Find(fd, []uint32{0, 1, 2}, size) _, err = Find(context.Background(), fd, []uint32{0, 1, 2}, size)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View File

@ -8,6 +8,7 @@ package weakhash
import ( import (
"bufio" "bufio"
"context"
"io" "io"
"github.com/chmduquesne/rollinghash/adler32" "github.com/chmduquesne/rollinghash/adler32"
@ -23,7 +24,7 @@ const (
// Find finds all the blocks of the given size within io.Reader that matches // Find finds all the blocks of the given size within io.Reader that matches
// the hashes provided, and returns a hash -> slice of offsets within reader // the hashes provided, and returns a hash -> slice of offsets within reader
// map, that produces the same weak hash. // map, that produces the same weak hash.
func Find(ir io.Reader, hashesToFind []uint32, size int) (map[uint32][]int64, error) { func Find(ctx context.Context, ir io.Reader, hashesToFind []uint32, size int) (map[uint32][]int64, error) {
if ir == nil || len(hashesToFind) == 0 { if ir == nil || len(hashesToFind) == 0 {
return nil, nil return nil, nil
} }
@ -50,6 +51,12 @@ func Find(ir io.Reader, hashesToFind []uint32, size int) (map[uint32][]int64, er
var i int64 var i int64
var hash uint32 var hash uint32
for { for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
hash = hf.Sum32() hash = hf.Sum32()
if existing, ok := offsets[hash]; ok && len(existing) < maxWeakhashFinderHits { if existing, ok := offsets[hash]; ok && len(existing) < maxWeakhashFinderHits {
offsets[hash] = append(existing, i) offsets[hash] = append(existing, i)
@ -67,8 +74,8 @@ func Find(ir io.Reader, hashesToFind []uint32, size int) (map[uint32][]int64, er
return offsets, nil return offsets, nil
} }
func NewFinder(ir io.ReadSeeker, size int, hashesToFind []uint32) (*Finder, error) { func NewFinder(ctx context.Context, ir io.ReadSeeker, size int, hashesToFind []uint32) (*Finder, error) {
offsets, err := Find(ir, hashesToFind, size) offsets, err := Find(ctx, ir, hashesToFind, size)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -11,6 +11,7 @@ package weakhash
import ( import (
"bytes" "bytes"
"context"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -36,7 +37,7 @@ func TestFinder(t *testing.T) {
} }
hashes := []uint32{65143183, 65798547} hashes := []uint32{65143183, 65798547}
finder, err := NewFinder(f, 4, hashes) finder, err := NewFinder(context.Background(), f, 4, hashes)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }