From d3ed4de4ed6e2f061f35df4b270359820145f1c9 Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Tue, 21 Apr 2020 19:55:14 +0200 Subject: [PATCH] lib/model: Don't exit pullerRoutine on cancelled ctx (fixes #6559) (#6562) * lib/model: Don't exit pullerRoutine on cancelled ctx (fixes #6559) * actual fix --- lib/model/bytesemaphore.go | 6 +++++ lib/model/folder_sendrecv.go | 26 ++++--------------- lib/model/folder_sendrecv_test.go | 43 +++++++++++++++++++++++++++++++ lib/model/sharedpullerstate.go | 22 ++++++++++++++++ 4 files changed, 76 insertions(+), 21 deletions(-) diff --git a/lib/model/bytesemaphore.go b/lib/model/bytesemaphore.go index 4a7c5f3ee..92feecf9e 100644 --- a/lib/model/bytesemaphore.go +++ b/lib/model/bytesemaphore.go @@ -51,6 +51,12 @@ func (s *byteSemaphore) take(bytes int) { } func (s *byteSemaphore) takeInner(ctx context.Context, bytes int) error { + // Checking context for bytes <= s.available is required for testing and doesn't do any harm. + select { + case <-ctx.Done(): + return ctx.Err() + default: + } s.mut.Lock() defer s.mut.Unlock() if bytes > s.max { diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index 5bd8ba87b..138d9ff30 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -1079,30 +1079,12 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, snap *db.Snapshot "action": "update", }) - s := sharedPullerState{ - file: file, - fs: f.fs, - folder: f.folderID, - tempName: tempName, - realName: file.Name, - copyTotal: len(blocks), - copyNeeded: len(blocks), - reused: len(reused), - updated: time.Now(), - available: reused, - availableUpdated: time.Now(), - ignorePerms: f.IgnorePerms || file.NoPermissions, - hasCurFile: hasCurFile, - curFile: curFile, - mut: sync.NewRWMutex(), - sparse: !f.DisableSparseFiles, - created: time.Now(), - } + s := newSharedPullerState(file, f.fs, f.folderID, tempName, blocks, reused, f.IgnorePerms || file.NoPermissions, hasCurFile, curFile, !f.DisableSparseFiles) l.Debugf("%v need file %s; copy %d, reused %v", f, file.Name, len(blocks), len(reused)) cs := copyBlocksState{ - sharedPullerState: &s, + sharedPullerState: s, blocks: blocks, have: len(have), } @@ -1384,7 +1366,9 @@ func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- * bytes := int(state.block.Size) if err := requestLimiter.takeWithContext(f.ctx, bytes); err != nil { - break + state.fail(err) + out <- state.sharedPullerState + continue } wg.Add(1) diff --git a/lib/model/folder_sendrecv_test.go b/lib/model/folder_sendrecv_test.go index 0de6bf7dd..326d1f217 100644 --- a/lib/model/folder_sendrecv_test.go +++ b/lib/model/folder_sendrecv_test.go @@ -1014,6 +1014,49 @@ func TestDeleteBehindSymlink(t *testing.T) { } } +// Reproduces https://github.com/syncthing/syncthing/issues/6559 +func TestPullCtxCancel(t *testing.T) { + m, f := setupSendReceiveFolder() + defer cleanupSRFolder(f, m) + + pullChan := make(chan pullBlockState) + finisherChan := make(chan *sharedPullerState) + + var cancel context.CancelFunc + f.ctx, cancel = context.WithCancel(context.Background()) + + go f.pullerRoutine(pullChan, finisherChan) + defer close(pullChan) + + emptyState := func() pullBlockState { + return pullBlockState{ + sharedPullerState: newSharedPullerState(protocol.FileInfo{}, nil, f.folderID, "", nil, nil, false, false, protocol.FileInfo{}, false), + block: protocol.BlockInfo{}, + } + } + + cancel() + + done := make(chan struct{}) + defer close(done) + for i := 0; i < 2; i++ { + go func() { + select { + case pullChan <- emptyState(): + case <-done: + } + }() + select { + case s := <-finisherChan: + if s.failed() == nil { + t.Errorf("state %v not failed", i) + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out before receiving state %v on finisherChan", i) + } + } +} + func cleanupSharedPullerState(s *sharedPullerState) { s.mut.Lock() defer s.mut.Unlock() diff --git a/lib/model/sharedpullerstate.go b/lib/model/sharedpullerstate.go index cb2313dd6..37aa625be 100644 --- a/lib/model/sharedpullerstate.go +++ b/lib/model/sharedpullerstate.go @@ -49,6 +49,28 @@ type sharedPullerState struct { mut sync.RWMutex // Protects the above } +func newSharedPullerState(file protocol.FileInfo, fs fs.Filesystem, folderID, tempName string, blocks []protocol.BlockInfo, reused []int32, ignorePerms, hasCurFile bool, curFile protocol.FileInfo, sparse bool) *sharedPullerState { + return &sharedPullerState{ + file: file, + fs: fs, + folder: folderID, + tempName: tempName, + realName: file.Name, + copyTotal: len(blocks), + copyNeeded: len(blocks), + reused: len(reused), + updated: time.Now(), + available: reused, + availableUpdated: time.Now(), + ignorePerms: ignorePerms, + hasCurFile: hasCurFile, + curFile: curFile, + mut: sync.NewRWMutex(), + sparse: sparse, + created: time.Now(), + } +} + // A momentary state representing the progress of the puller type pullerProgress struct { Total int `json:"total"`