lib: Repair sequence inconsistencies (#6367)

This commit is contained in:
Simon Frei 2020-03-18 17:34:46 +01:00 committed by GitHub
parent 80107d5f5e
commit cc2a55892f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 321 additions and 65 deletions

View File

@ -7,6 +7,7 @@
package db package db
import ( import (
"bytes"
"testing" "testing"
"github.com/syncthing/syncthing/lib/db/backend" "github.com/syncthing/syncthing/lib/db/backend"
@ -249,6 +250,172 @@ func TestUpdate0to3(t *testing.T) {
} }
} }
// TestRepairSequence checks that a few hand-crafted messed-up sequence entries get fixed.
func TestRepairSequence(t *testing.T) {
db := NewLowlevel(backend.OpenMemory())
defer db.Close()
folderStr := "test"
folder := []byte(folderStr)
id := protocol.LocalDeviceID
short := protocol.LocalDeviceID.Short()
files := []protocol.FileInfo{
{Name: "fine"},
{Name: "duplicate"},
{Name: "missing"},
{Name: "overwriting"},
{Name: "inconsistent"},
}
for i, f := range files {
files[i].Version = f.Version.Update(short)
}
trans, err := db.newReadWriteTransaction()
if err != nil {
t.Fatal(err)
}
defer trans.close()
addFile := func(f protocol.FileInfo, seq int64) {
dk, err := trans.keyer.GenerateDeviceFileKey(nil, folder, id[:], []byte(f.Name))
if err != nil {
t.Fatal(err)
}
if err := trans.putFile(dk, f); err != nil {
t.Fatal(err)
}
sk, err := trans.keyer.GenerateSequenceKey(nil, folder, seq)
if err != nil {
t.Fatal(err)
}
if err := trans.Put(sk, dk); err != nil {
t.Fatal(err)
}
}
// Plain normal entry
var seq int64 = 1
files[0].Sequence = 1
addFile(files[0], seq)
// Second entry once updated with original sequence still in place
f := files[1]
f.Sequence = int64(len(files) + 1)
addFile(f, f.Sequence)
// Original sequence entry
seq++
sk, err := trans.keyer.GenerateSequenceKey(nil, folder, seq)
if err != nil {
t.Fatal(err)
}
dk, err := trans.keyer.GenerateDeviceFileKey(nil, folder, id[:], []byte(f.Name))
if err != nil {
t.Fatal(err)
}
if err := trans.Put(sk, dk); err != nil {
t.Fatal(err)
}
// File later overwritten thus missing sequence entry
seq++
files[2].Sequence = seq
addFile(files[2], seq)
// File overwriting previous sequence entry (no seq bump)
seq++
files[3].Sequence = seq
addFile(files[3], seq)
// Inconistent file
seq++
files[4].Sequence = 101
addFile(files[4], seq)
// And a sequence entry pointing at nothing because why not
sk, err = trans.keyer.GenerateSequenceKey(nil, folder, 100001)
if err != nil {
t.Fatal(err)
}
dk, err = trans.keyer.GenerateDeviceFileKey(nil, folder, id[:], []byte("nonexisting"))
if err != nil {
t.Fatal(err)
}
if err := trans.Put(sk, dk); err != nil {
t.Fatal(err)
}
if err := trans.Commit(); err != nil {
t.Fatal(err)
}
// Loading the metadata for the first time means a "re"calculation happens,
// along which the sequences get repaired too.
db.gcMut.RLock()
_ = loadMetadataTracker(db, folderStr)
db.gcMut.RUnlock()
if err != nil {
t.Fatal(err)
}
// Check the db
ro, err := db.newReadOnlyTransaction()
if err != nil {
t.Fatal(err)
}
defer ro.close()
it, err := ro.NewPrefixIterator([]byte{KeyTypeDevice})
if err != nil {
t.Fatal(err)
}
defer it.Release()
for it.Next() {
fi, err := ro.unmarshalTrunc(it.Value(), true)
if err != nil {
t.Fatal(err)
}
if sk, err = ro.keyer.GenerateSequenceKey(sk, folder, fi.SequenceNo()); err != nil {
t.Fatal(err)
}
dk, err := ro.Get(sk)
if backend.IsNotFound(err) {
t.Error("Missing sequence entry for", fi.FileName())
} else if err != nil {
t.Fatal(err)
}
if !bytes.Equal(it.Key(), dk) {
t.Errorf("Wrong key for %v, expected %s, got %s", f.FileName(), it.Key(), dk)
}
}
if err := it.Error(); err != nil {
t.Fatal(err)
}
it.Release()
it, err = ro.NewPrefixIterator([]byte{KeyTypeSequence})
if err != nil {
t.Fatal(err)
}
defer it.Release()
for it.Next() {
fi, ok, err := ro.getFileTrunc(it.Value(), true)
if err != nil {
t.Fatal(err)
}
seq := ro.keyer.SequenceFromSequenceKey(it.Key())
if !ok {
t.Errorf("Sequence entry %v points at nothing", seq)
} else if fi.SequenceNo() != seq {
t.Errorf("Inconsistent sequence entry for %v: %v != %v", fi.FileName(), fi.SequenceNo(), seq)
}
}
if err := it.Error(); err != nil {
t.Fatal(err)
}
it.Release()
}
func TestDowngrade(t *testing.T) { func TestDowngrade(t *testing.T) {
db := NewLowlevel(backend.OpenMemory()) db := NewLowlevel(backend.OpenMemory())
defer db.Close() defer db.Close()

View File

@ -121,6 +121,10 @@ func (k deviceFileKey) WithoutNameAndDevice() []byte {
return k[:keyPrefixLen+keyFolderLen] return k[:keyPrefixLen+keyFolderLen]
} }
func (k deviceFileKey) WithoutName() []byte {
return k[:keyPrefixLen+keyFolderLen+keyDeviceLen]
}
func (k defaultKeyer) GenerateDeviceFileKey(key, folder, device, name []byte) (deviceFileKey, error) { func (k defaultKeyer) GenerateDeviceFileKey(key, folder, device, name []byte) (deviceFileKey, error) {
folderID, err := k.folderIdx.ID(folder) folderID, err := k.folderIdx.ID(folder)
if err != nil { if err != nil {

View File

@ -623,6 +623,118 @@ func (db *Lowlevel) gcIndirect() error {
return db.Compact() return db.Compact()
} }
// repairSequenceGCLocked makes sure the sequence numbers in the sequence keys
// match those in the corresponding file entries. It returns the amount of fixed
// entries.
func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
t, err := db.newReadWriteTransaction()
if err != nil {
return 0, err
}
defer t.close()
fixed := 0
folder := []byte(folderStr)
// First check that every file entry has a matching sequence entry
// (this was previously db schema upgrade to 9).
dk, err := t.keyer.GenerateDeviceFileKey(nil, folder, protocol.LocalDeviceID[:], nil)
if err != nil {
return 0, err
}
it, err := t.NewPrefixIterator(dk.WithoutName())
if err != nil {
return 0, err
}
defer it.Release()
var sk sequenceKey
for it.Next() {
intf, err := t.unmarshalTrunc(it.Value(), true)
if err != nil {
return 0, err
}
fi := intf.(FileInfoTruncated)
if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
return 0, err
}
switch dk, err = t.Get(sk); {
case err != nil:
if !backend.IsNotFound(err) {
return 0, err
}
fallthrough
case !bytes.Equal(it.Key(), dk):
fixed++
fi.Sequence = meta.nextLocalSeq()
if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
return 0, err
}
if err := t.Put(sk, it.Key()); err != nil {
return 0, err
}
if err := t.putFile(it.Key(), fi.copyToFileInfo()); err != nil {
return 0, err
}
}
if err := t.Checkpoint(func() error {
return meta.toDB(t, folder)
}); err != nil {
return 0, err
}
}
if err := it.Error(); err != nil {
return 0, err
}
it.Release()
// Secondly check there's no sequence entries pointing at incorrect things.
sk, err = t.keyer.GenerateSequenceKey(sk, folder, 0)
if err != nil {
return 0, err
}
it, err = t.NewPrefixIterator(sk.WithoutSequence())
if err != nil {
return 0, err
}
defer it.Release()
for it.Next() {
// Check that the sequence from the key matches the
// sequence in the file.
fi, ok, err := t.getFileTrunc(it.Value(), true)
if err != nil {
return 0, err
}
if ok {
if seq := t.keyer.SequenceFromSequenceKey(it.Key()); seq == fi.SequenceNo() {
continue
}
}
// Either the file is missing or has a different sequence number
fixed++
if err := t.Delete(it.Key()); err != nil {
return 0, err
}
}
if err := it.Error(); err != nil {
return 0, err
}
it.Release()
if err := meta.toDB(t, folder); err != nil {
return 0, err
}
return fixed, t.Commit()
}
// unchanged checks if two files are the same and thus don't need to be updated. // unchanged checks if two files are the same and thus don't need to be updated.
// Local flags or the invalid bit might change without the version // Local flags or the invalid bit might change without the version
// being bumped. // being bumped.

View File

@ -7,8 +7,6 @@
package db package db
import ( import (
"bytes"
"errors"
"fmt" "fmt"
"strings" "strings"
@ -20,23 +18,15 @@ import (
// 0: v0.14.0 // 0: v0.14.0
// 1: v0.14.46 // 1: v0.14.46
// 2: v0.14.48 // 2: v0.14.48
// 3: v0.14.49 // 3-5: v0.14.49
// 4: v0.14.49
// 5: v0.14.49
// 6: v0.14.50 // 6: v0.14.50
// 7: v0.14.53 // 7: v0.14.53
// 8: v1.4.0 // 8-9: v1.4.0
// 9: v1.4.0
const ( const (
dbVersion = 9 dbVersion = 9
dbMinSyncthingVersion = "v1.4.0" dbMinSyncthingVersion = "v1.4.0"
) )
var (
errFolderIdxMissing = errors.New("folder db index missing")
errDeviceIdxMissing = errors.New("device db index missing")
)
type databaseDowngradeError struct { type databaseDowngradeError struct {
minSyncthingVersion string minSyncthingVersion string
} }
@ -445,12 +435,10 @@ func (db *schemaUpdater) updateSchemato9(prev int) error {
} }
defer t.close() defer t.close()
var sk []byte
it, err := t.NewPrefixIterator([]byte{KeyTypeDevice}) it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
if err != nil { if err != nil {
return err return err
} }
metas := make(map[string]*metadataTracker)
for it.Next() { for it.Next() {
intf, err := t.unmarshalTrunc(it.Value(), false) intf, err := t.unmarshalTrunc(it.Value(), false)
if backend.IsNotFound(err) { if backend.IsNotFound(err) {
@ -466,48 +454,6 @@ func (db *schemaUpdater) updateSchemato9(prev int) error {
return err return err
} }
fi := intf.(protocol.FileInfo) fi := intf.(protocol.FileInfo)
device, ok := t.keyer.DeviceFromDeviceFileKey(it.Key())
if !ok {
return errDeviceIdxMissing
}
if bytes.Equal(device, protocol.LocalDeviceID[:]) {
folder, ok := t.keyer.FolderFromDeviceFileKey(it.Key())
if !ok {
return errFolderIdxMissing
}
if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
return err
}
switch dk, err := t.Get(sk); {
case err != nil:
if !backend.IsNotFound(err) {
return err
}
fallthrough
case !bytes.Equal(it.Key(), dk):
folderStr := string(folder)
meta, ok := metas[folderStr]
if !ok {
meta = loadMetadataTracker(db.Lowlevel, folderStr)
metas[folderStr] = meta
}
fi.Sequence = meta.nextLocalSeq()
if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
return err
}
if err := t.Put(sk, it.Key()); err != nil {
return err
}
if err := t.putFile(it.Key(), fi); err != nil {
return err
}
continue
}
}
if prev == 8 {
// The transition to 8 already did the changes below.
continue
}
if fi.Blocks == nil { if fi.Blocks == nil {
continue continue
} }
@ -520,12 +466,6 @@ func (db *schemaUpdater) updateSchemato9(prev int) error {
return err return err
} }
for folder, meta := range metas {
if err := meta.toDB(t, []byte(folder)); err != nil {
return err
}
}
db.recordTime(indirectGCTimeKey) db.recordTime(indirectGCTimeKey)
return t.Commit() return t.Commit()

View File

@ -82,7 +82,16 @@ func NewFileSet(folder string, fs fs.Filesystem, db *Lowlevel) *FileSet {
func loadMetadataTracker(db *Lowlevel, folder string) *metadataTracker { func loadMetadataTracker(db *Lowlevel, folder string) *metadataTracker {
recalc := func() *metadataTracker { recalc := func() *metadataTracker {
db.gcMut.RLock()
defer db.gcMut.RUnlock()
meta, err := recalcMeta(db, folder) meta, err := recalcMeta(db, folder)
if err == nil {
var fixed int
fixed, err = db.repairSequenceGCLocked(folder, meta)
if fixed != 0 {
l.Infoln("Repaired %v sequence entries in database", fixed)
}
}
if backend.IsClosed(err) { if backend.IsClosed(err) {
return nil return nil
} else if err != nil { } else if err != nil {
@ -531,6 +540,18 @@ func (s *FileSet) ListDevices() []protocol.DeviceID {
return s.meta.devices() return s.meta.devices()
} }
func (s *FileSet) RepairSequence() (int, error) {
s.updateAndGCMutexLock() // Ensures consistent locking order
defer s.updateMutex.Unlock()
defer s.db.gcMut.RUnlock()
return s.db.repairSequenceGCLocked(s.folder, s.meta)
}
func (s *FileSet) updateAndGCMutexLock() {
s.updateMutex.Lock()
s.db.gcMut.RLock()
}
// DropFolder clears out all information related to the given folder from the // DropFolder clears out all information related to the given folder from the
// database. // database.
func DropFolder(db *Lowlevel, folder string) { func DropFolder(db *Lowlevel, folder string) {

View File

@ -1989,9 +1989,21 @@ func (s *indexSender) sendIndexTo(ctx context.Context) error {
if fi.SequenceNo() < s.prevSequence+1 { if fi.SequenceNo() < s.prevSequence+1 {
panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1)) panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
} }
if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
panic(fmt.Sprintln("non-increasing sequence, current:", fi.SequenceNo(), "<= previous:", f.Sequence))
} }
if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
l.Warnln("Non-increasing sequence detected: Checking and repairing the db...")
// Abort this round of index sending - the next one will pick
// up from the last successful one with the repeaired db.
defer func() {
if fixed, dbErr := s.fset.RepairSequence(); dbErr != nil {
l.Warnln("Failed repairing sequence entries:", dbErr)
panic("Failed repairing sequence entries")
} else {
l.Infoln("Repaired %v sequence entries in database", fixed)
}
}()
return false
} }
f = fi.(protocol.FileInfo) f = fi.(protocol.FileInfo)