mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-10 18:24:44 +00:00
The problem here is that we would update the sequence index before updating the FileInfos, which would result in a high sequence number pointing to a low-sequence FileInfo. The index sender would pick up the high sequence number, send the old file, and think everything was good. On the receiving side the old file is a no-op and ignored. The file remains out of sync until another update for it happens. This fixes that by correcting the order of operations in the database update: first we remove old sequence index entries, then we update the FileInfos (which now don't have anything pointing to them) and then we add the sequence indexes (which the index sender can see). The other option is to add "proper" transactions where required at the database layer. I actually have a branch for that, but it's literally thousands of lines of diff and I'm putting that off for another day as this solves the problem...
This commit is contained in:
parent
7c0798b622
commit
b80da29b23
@ -20,3 +20,7 @@ var (
|
||||
func init() {
|
||||
l.SetDebug("db", strings.Contains(os.Getenv("STTRACE"), "db") || os.Getenv("STTRACE") == "all")
|
||||
}
|
||||
|
||||
func shouldDebug() bool {
|
||||
return l.ShouldDebug("db")
|
||||
}
|
||||
|
@ -221,6 +221,14 @@ func (db *Instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator)
|
||||
l.Debugln("missing file for sequence number", db.sequenceKeySequence(dbi.Key()))
|
||||
continue
|
||||
}
|
||||
|
||||
if shouldDebug() {
|
||||
key := dbi.Key()
|
||||
seq := int64(binary.BigEndian.Uint64(key[keyPrefixLen+keyFolderLen:]))
|
||||
if f.Sequence != seq {
|
||||
panic(fmt.Sprintf("sequence index corruption, file sequence %d != expected %d", f.Sequence, seq))
|
||||
}
|
||||
}
|
||||
if !fn(f) {
|
||||
return
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ package db
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/fs"
|
||||
@ -139,38 +140,56 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
|
||||
s.updateMutex.Lock()
|
||||
defer s.updateMutex.Unlock()
|
||||
|
||||
if device == protocol.LocalDeviceID {
|
||||
discards := make([]protocol.FileInfo, 0, len(fs))
|
||||
updates := make([]protocol.FileInfo, 0, len(fs))
|
||||
// db.UpdateFiles will sort unchanged files out -> save one db lookup
|
||||
// filter slice according to https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
||||
oldFs := fs
|
||||
fs = fs[:0]
|
||||
var dk []byte
|
||||
folder := []byte(s.folder)
|
||||
for _, nf := range oldFs {
|
||||
dk = s.db.deviceKeyInto(dk, folder, device[:], []byte(osutil.NormalizedFilename(nf.Name)))
|
||||
ef, ok := s.db.getFile(dk)
|
||||
if ok && ef.Version.Equal(nf.Version) && ef.IsInvalid() == nf.IsInvalid() {
|
||||
continue
|
||||
}
|
||||
defer s.meta.toDB(s.db, []byte(s.folder))
|
||||
|
||||
nf.Sequence = s.meta.nextSeq(protocol.LocalDeviceID)
|
||||
fs = append(fs, nf)
|
||||
|
||||
if ok {
|
||||
discards = append(discards, ef)
|
||||
}
|
||||
updates = append(updates, nf)
|
||||
}
|
||||
s.blockmap.Discard(discards)
|
||||
s.blockmap.Update(updates)
|
||||
s.db.removeSequences(folder, discards)
|
||||
s.db.addSequences(folder, updates)
|
||||
if device != protocol.LocalDeviceID {
|
||||
// Easy case, just update the files and we're done.
|
||||
s.db.updateFiles([]byte(s.folder), device[:], fs, s.meta)
|
||||
return
|
||||
}
|
||||
|
||||
// For the local device we have a bunch of metadata to track however...
|
||||
|
||||
discards := make([]protocol.FileInfo, 0, len(fs))
|
||||
updates := make([]protocol.FileInfo, 0, len(fs))
|
||||
// db.UpdateFiles will sort unchanged files out -> save one db lookup
|
||||
// filter slice according to https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
||||
oldFs := fs
|
||||
fs = fs[:0]
|
||||
var dk []byte
|
||||
folder := []byte(s.folder)
|
||||
for _, nf := range oldFs {
|
||||
dk = s.db.deviceKeyInto(dk, folder, device[:], []byte(osutil.NormalizedFilename(nf.Name)))
|
||||
ef, ok := s.db.getFile(dk)
|
||||
if ok && ef.Version.Equal(nf.Version) && ef.IsInvalid() == nf.IsInvalid() {
|
||||
continue
|
||||
}
|
||||
|
||||
nf.Sequence = s.meta.nextSeq(protocol.LocalDeviceID)
|
||||
fs = append(fs, nf)
|
||||
|
||||
if ok {
|
||||
discards = append(discards, ef)
|
||||
}
|
||||
updates = append(updates, nf)
|
||||
}
|
||||
|
||||
// The ordering here is important. We first remove stuff that point to
|
||||
// files we are going to update, then update them, then add new index
|
||||
// pointers etc. In addition, we do the discards in reverse order so
|
||||
// that a reader traversing the sequence index will get a consistent
|
||||
// view up until the point they meet the writer.
|
||||
|
||||
sort.Slice(discards, func(a, b int) bool {
|
||||
// n.b. "b < a" instead of the usual "a < b"
|
||||
return discards[b].Sequence < discards[a].Sequence
|
||||
})
|
||||
|
||||
s.blockmap.Discard(discards)
|
||||
s.db.removeSequences(folder, discards)
|
||||
s.db.updateFiles([]byte(s.folder), device[:], fs, s.meta)
|
||||
s.meta.toDB(s.db, []byte(s.folder))
|
||||
s.db.addSequences(folder, updates)
|
||||
s.blockmap.Update(updates)
|
||||
}
|
||||
|
||||
func (s *FileSet) WithNeed(device protocol.DeviceID, fn Iterator) {
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"os"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/d4l3k/messagediff"
|
||||
"github.com/syncthing/syncthing/lib/db"
|
||||
@ -914,6 +915,56 @@ func TestWithHaveSequence(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestStressWithHaveSequence(t *testing.T) {
|
||||
// This races two loops against each other: one that contiously does
|
||||
// updates, and one that continously does sequence walks. The test fails
|
||||
// if the sequence walker sees a discontinuity.
|
||||
|
||||
if testing.Short() {
|
||||
t.Skip("Takes a long time")
|
||||
}
|
||||
|
||||
ldb := db.OpenMemory()
|
||||
|
||||
folder := "test"
|
||||
s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
|
||||
|
||||
var localHave []protocol.FileInfo
|
||||
for i := 0; i < 100; i++ {
|
||||
localHave = append(localHave, protocol.FileInfo{Name: fmt.Sprintf("file%d", i), Blocks: genBlocks(i * 10)})
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
t0 := time.Now()
|
||||
go func() {
|
||||
for time.Since(t0) < 10*time.Second {
|
||||
for j, f := range localHave {
|
||||
localHave[j].Version = f.Version.Update(42)
|
||||
}
|
||||
|
||||
s.Update(protocol.LocalDeviceID, localHave)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
var prevSeq int64 = 0
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
break loop
|
||||
default:
|
||||
}
|
||||
s.WithHaveSequence(prevSeq+1, func(fi db.FileIntf) bool {
|
||||
if fi.SequenceNo() < prevSeq+1 {
|
||||
t.Fatal("Skipped ", prevSeq+1, fi.SequenceNo())
|
||||
}
|
||||
prevSeq = fi.SequenceNo()
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIssue4925(t *testing.T) {
|
||||
ldb := db.OpenMemory()
|
||||
|
||||
|
@ -1765,6 +1765,15 @@ func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs
|
||||
batchSizeBytes = 0
|
||||
}
|
||||
|
||||
if shouldDebug() {
|
||||
if fi.SequenceNo() < prevSequence+1 {
|
||||
panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", prevSequence+1))
|
||||
}
|
||||
if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
|
||||
panic(fmt.Sprintln("non-increasing sequence, current:", fi.SequenceNo(), "<= previous:", f.Sequence))
|
||||
}
|
||||
}
|
||||
|
||||
f = fi.(protocol.FileInfo)
|
||||
|
||||
// Mark the file as invalid if any of the local bad stuff flags are set.
|
||||
|
Loading…
Reference in New Issue
Block a user