From 1e69997ecdbf87ceaad76bd0149d98f560f4fdb5 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Fri, 18 Jan 2019 11:34:18 +0100 Subject: [PATCH] lib/db: Fix iterating sequence index (fixes #5340) (#5462) There was a problem in iterating the sequence index that could result in missing updates. The issue is that while the index was (correctly) iterated in a snapshot, the actual file infos were read dirty outside of the snapshot. This fixes this by doing the reads inside the snapshot, and also updates a couple of other places that did the same thing more or less harmfully (I didn't investigate). To avoid similar issues in the future I did some renaming of the getFile* methods - the ones in a transaction are just getFile, while the ones directly on the database are variants of getFileDirty to highlight what's going on. --- lib/db/db_test.go | 2 +- lib/db/instance.go | 50 ++++++++------------ lib/db/schemaupdater.go | 2 +- lib/db/set.go | 6 +-- lib/db/set_test.go | 102 ++++++++++++++++++++++++++++++++++++++++ lib/db/structs.go | 4 +- lib/db/transactions.go | 29 +++++++++++- 7 files changed, 155 insertions(+), 40 deletions(-) diff --git a/lib/db/db_test.go b/lib/db/db_test.go index 40ec99c05..8c6942f67 100644 --- a/lib/db/db_test.go +++ b/lib/db/db_test.go @@ -149,7 +149,7 @@ func TestUpdate0to3(t *testing.T) { updater.updateSchema0to1() - if _, ok := db.getFile(db.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], []byte(slashPrefixed))); ok { + if _, ok := db.getFileDirty(folder, protocol.LocalDeviceID[:], []byte(slashPrefixed)); ok { t.Error("File prefixed by '/' was not removed during transition to schema 1") } diff --git a/lib/db/instance.go b/lib/db/instance.go index e69061f9b..899416bc9 100644 --- a/lib/db/instance.go +++ b/lib/db/instance.go @@ -99,6 +99,9 @@ func (db *instance) removeSequences(folder []byte, fs []protocol.FileInfo) { } func (db *instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) { + t := db.newReadOnlyTransaction() + defer t.close() + if len(prefix) > 0 { unslashedPrefix := prefix if bytes.HasSuffix(prefix, []byte{'/'}) { @@ -107,14 +110,11 @@ func (db *instance) withHave(folder, device, prefix []byte, truncate bool, fn It prefix = append(prefix, '/') } - if f, ok := db.getFileTrunc(db.keyer.GenerateDeviceFileKey(nil, folder, device, unslashedPrefix), true); ok && !fn(f) { + if f, ok := t.getFileTrunc(db.keyer.GenerateDeviceFileKey(nil, folder, device, unslashedPrefix), true); ok && !fn(f) { return } } - t := db.newReadOnlyTransaction() - defer t.close() - dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateDeviceFileKey(nil, folder, device, prefix)), nil) defer dbi.Release() @@ -124,11 +124,7 @@ func (db *instance) withHave(folder, device, prefix []byte, truncate bool, fn It return } - // The iterator function may keep a reference to the unmarshalled - // struct, which in turn references the buffer it was unmarshalled - // from. dbi.Value() just returns an internal slice that it reuses, so - // we need to copy it. - f, err := unmarshalTrunc(append([]byte{}, dbi.Value()...), truncate) + f, err := unmarshalTrunc(dbi.Value(), truncate) if err != nil { l.Debugln("unmarshal error:", err) continue @@ -147,7 +143,7 @@ func (db *instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator) defer dbi.Release() for dbi.Next() { - f, ok := db.getFile(dbi.Value()) + f, ok := t.getFileByKey(dbi.Value()) if !ok { l.Debugln("missing file for sequence number", db.keyer.SequenceFromSequenceKey(dbi.Key())) continue @@ -209,27 +205,21 @@ func (db *instance) withAllFolderTruncated(folder []byte, fn func(device []byte, } } -func (db *instance) getFile(key []byte) (protocol.FileInfo, bool) { - if f, ok := db.getFileTrunc(key, false); ok { - return f.(protocol.FileInfo), true - } - return protocol.FileInfo{}, false -} - -func (db *instance) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) { +func (db *instance) getFileDirty(folder, device, file []byte) (protocol.FileInfo, bool) { + key := db.keyer.GenerateDeviceFileKey(nil, folder, device, file) bs, err := db.Get(key, nil) if err == leveldb.ErrNotFound { - return nil, false + return protocol.FileInfo{}, false } if err != nil { l.Debugln("surprise error:", err) - return nil, false + return protocol.FileInfo{}, false } - f, err := unmarshalTrunc(bs, trunc) - if err != nil { + var f protocol.FileInfo + if err := f.Unmarshal(bs); err != nil { l.Debugln("unmarshal error:", err) - return nil, false + return protocol.FileInfo{}, false } return f, true } @@ -256,7 +246,7 @@ func (db *instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file [] } dk = db.keyer.GenerateDeviceFileKey(dk, folder, vl.Versions[0].Device, file) - if fi, ok := db.getFileTrunc(dk, truncate); ok { + if fi, ok := t.getFileTrunc(dk, truncate); ok { return gk, dk, fi, true } @@ -264,6 +254,9 @@ func (db *instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file [] } func (db *instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) { + t := db.newReadOnlyTransaction() + defer t.close() + if len(prefix) > 0 { unslashedPrefix := prefix if bytes.HasSuffix(prefix, []byte{'/'}) { @@ -272,14 +265,11 @@ func (db *instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator prefix = append(prefix, '/') } - if f, ok := db.getGlobal(folder, unslashedPrefix, truncate); ok && !fn(f) { + if _, _, f, ok := db.getGlobalInto(t, nil, nil, folder, unslashedPrefix, truncate); ok && !fn(f) { return } } - t := db.newReadOnlyTransaction() - defer t.close() - dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateGlobalVersionKey(nil, folder, prefix)), nil) defer dbi.Release() @@ -297,7 +287,7 @@ func (db *instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator fk = db.keyer.GenerateDeviceFileKey(fk, folder, vl.Versions[0].Device, name) - f, ok := db.getFileTrunc(fk, truncate) + f, ok := t.getFileTrunc(fk, truncate) if !ok { continue } @@ -504,7 +494,7 @@ func (db *instance) checkGlobals(folder []byte, meta *metadataTracker) { newVL.Versions = append(newVL.Versions, version) if i == 0 { - if fi, ok := db.getFile(fk); ok { + if fi, ok := t.getFileByKey(fk); ok { meta.addFile(protocol.GlobalDeviceID, fi) } } diff --git a/lib/db/schemaupdater.go b/lib/db/schemaupdater.go index e6dee7427..ac1d05c08 100644 --- a/lib/db/schemaupdater.go +++ b/lib/db/schemaupdater.go @@ -203,7 +203,7 @@ func (db *schemaUpdater) updateSchema2to3() { name := []byte(f.FileName()) dk = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name) var v protocol.Vector - haveFile, ok := db.getFileTrunc(dk, true) + haveFile, ok := t.getFileTrunc(dk, true) if ok { v = haveFile.FileVersion() } diff --git a/lib/db/set.go b/lib/db/set.go index 948d563a2..40b523831 100644 --- a/lib/db/set.go +++ b/lib/db/set.go @@ -161,11 +161,9 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) { // filter slice according to https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating oldFs := fs fs = fs[:0] - var dk []byte folder := []byte(s.folder) for _, nf := range oldFs { - dk = s.db.keyer.GenerateDeviceFileKey(dk, folder, device[:], []byte(osutil.NormalizedFilename(nf.Name))) - ef, ok := s.db.getFile(dk) + ef, ok := s.db.getFileDirty(folder, device[:], []byte(osutil.NormalizedFilename(nf.Name))) if ok && ef.Version.Equal(nf.Version) && ef.IsInvalid() == nf.IsInvalid() { continue } @@ -246,7 +244,7 @@ func (s *FileSet) WithPrefixedGlobalTruncated(prefix string, fn Iterator) { } func (s *FileSet) Get(device protocol.DeviceID, file string) (protocol.FileInfo, bool) { - f, ok := s.db.getFile(s.db.keyer.GenerateDeviceFileKey(nil, []byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file)))) + f, ok := s.db.getFileDirty([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file))) f.Name = osutil.NativeFilename(f.Name) return f, ok } diff --git a/lib/db/set_test.go b/lib/db/set_test.go index d6174e69e..2abc32966 100644 --- a/lib/db/set_test.go +++ b/lib/db/set_test.go @@ -1363,6 +1363,108 @@ func TestCaseSensitive(t *testing.T) { } } +func TestSequenceIndex(t *testing.T) { + // This test attempts to verify correct operation of the sequence index. + + // It's a stress test and needs to run for a long time, but we don't + // really have time for that in normal builds. + runtime := time.Minute + if testing.Short() { + runtime = time.Second + } + + // Set up a db and a few files that we will manipulate. + + ldb := db.OpenMemory() + s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) + + local := []protocol.FileInfo{ + {Name: filepath.FromSlash("banana"), Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}}, + {Name: filepath.FromSlash("pineapple"), Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}}, + {Name: filepath.FromSlash("orange"), Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}}, + {Name: filepath.FromSlash("apple"), Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}}, + {Name: filepath.FromSlash("jackfruit"), Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}}, + } + + // Start a background routine that makes updates to these files as fast + // as it can. We always update the same files in the same order. + + done := make(chan struct{}) + defer close(done) + + go func() { + for { + select { + case <-done: + return + default: + } + + for i := range local { + local[i].Version = local[i].Version.Update(42) + } + s.Update(protocol.LocalDeviceID, local) + } + }() + + // Start a routine to walk the sequence index and inspect the result. + + seen := make(map[string]db.FileIntf) + latest := make([]db.FileIntf, 0, len(local)) + var seq int64 + t0 := time.Now() + + for time.Since(t0) < runtime { + // Walk the changes since our last iteration. This should give is + // one instance each of the files that are changed all the time, or + // a subset of those files if we manage to run before a complete + // update has happened since our last iteration. + latest = latest[:0] + s.WithHaveSequence(seq+1, func(f db.FileIntf) bool { + seen[f.FileName()] = f + latest = append(latest, f) + seq = f.SequenceNo() + return true + }) + + // Calculate the spread in sequence number. + var max, min int64 + for _, v := range seen { + s := v.SequenceNo() + if max == 0 || max < s { + max = s + } + if min == 0 || min > s { + min = s + } + } + + // We shouldn't see a spread larger than the number of files, as + // that would mean we have missed updates. For example, if we were + // to see the following: + // + // banana N + // pineapple N+1 + // orange N+2 + // apple N+10 + // jackfruit N+11 + // + // that would mean that there have been updates to banana, pineapple + // and orange that we didn't see in this pass. If those files aren't + // updated again, those updates are permanently lost. + if max-min > int64(len(local)) { + for _, v := range seen { + t.Log("seen", v.FileName(), v.SequenceNo()) + } + for _, v := range latest { + t.Log("latest", v.FileName(), v.SequenceNo()) + } + t.Fatal("large spread") + } + time.Sleep(time.Millisecond) + } +} + func replace(fs *db.FileSet, device protocol.DeviceID, files []protocol.FileInfo) { fs.Drop(device) fs.Update(device, files) diff --git a/lib/db/structs.go b/lib/db/structs.go index e0ad73aa2..4472bbbfb 100644 --- a/lib/db/structs.go +++ b/lib/db/structs.go @@ -164,7 +164,7 @@ func (vl VersionList) String() string { // update brings the VersionList up to date with file. It returns the updated // VersionList, a potentially removed old FileVersion and its index, as well as // the index where the new FileVersion was inserted. -func (vl VersionList) update(folder, device []byte, file protocol.FileInfo, db *instance) (_ VersionList, removedFV FileVersion, removedAt int, insertedAt int) { +func (vl VersionList) update(folder, device []byte, file protocol.FileInfo, t readOnlyTransaction) (_ VersionList, removedFV FileVersion, removedAt int, insertedAt int) { vl, removedFV, removedAt = vl.pop(device) nv := FileVersion{ @@ -198,7 +198,7 @@ func (vl VersionList) update(folder, device []byte, file protocol.FileInfo, db * // to determine the winner.) // // A surprise missing file entry here is counted as a win for us. - if of, ok := db.getFile(db.keyer.GenerateDeviceFileKey(nil, folder, vl.Versions[i].Device, []byte(file.Name))); !ok || file.WinsConflict(of) { + if of, ok := t.getFile(folder, vl.Versions[i].Device, []byte(file.Name)); !ok || file.WinsConflict(of) { vl = vl.insertAt(i, nv) return vl, removedFV, removedAt, i } diff --git a/lib/db/transactions.go b/lib/db/transactions.go index b2fb75183..cc4eaec71 100644 --- a/lib/db/transactions.go +++ b/lib/db/transactions.go @@ -37,7 +37,32 @@ func (t readOnlyTransaction) close() { } func (t readOnlyTransaction) getFile(folder, device, file []byte) (protocol.FileInfo, bool) { - return t.db.getFile(t.db.keyer.GenerateDeviceFileKey(nil, folder, device, file)) + return t.getFileByKey(t.db.keyer.GenerateDeviceFileKey(nil, folder, device, file)) +} + +func (t readOnlyTransaction) getFileByKey(key []byte) (protocol.FileInfo, bool) { + if f, ok := t.getFileTrunc(key, false); ok { + return f.(protocol.FileInfo), true + } + return protocol.FileInfo{}, false +} + +func (t readOnlyTransaction) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) { + bs, err := t.Get(key, nil) + if err == leveldb.ErrNotFound { + return nil, false + } + if err != nil { + l.Debugln("surprise error:", err) + return nil, false + } + + f, err := unmarshalTrunc(bs, trunc) + if err != nil { + l.Debugln("unmarshal error:", err) + return nil, false + } + return f, true } // A readWriteTransaction is a readOnlyTransaction plus a batch for writes. @@ -90,7 +115,7 @@ func (t readWriteTransaction) updateGlobal(gk, folder, device []byte, file proto if svl, err := t.Get(gk, nil); err == nil { fl.Unmarshal(svl) // Ignore error, continue with empty fl } - fl, removedFV, removedAt, insertedAt := fl.update(folder, device, file, t.db) + fl, removedFV, removedAt, insertedAt := fl.update(folder, device, file, t.readOnlyTransaction) if insertedAt == -1 { l.Debugln("update global; same version, global unchanged") return false