lib/db: Schema update to repair sequence index (ref #6304) (#6350)

This commit is contained in:
Simon Frei 2020-02-22 09:36:59 +01:00 committed by Jakob Borg
parent a4bd4d118a
commit 0fb2cd52ff
2 changed files with 93 additions and 27 deletions

View File

@ -7,9 +7,11 @@
package db package db
import ( import (
"bytes"
"fmt" "fmt"
"strings" "strings"
"github.com/syncthing/syncthing/lib/db/backend"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
) )
@ -23,11 +25,17 @@ import (
// 6: v0.14.50 // 6: v0.14.50
// 7: v0.14.53 // 7: v0.14.53
// 8: v1.4.0 // 8: v1.4.0
// 9: v1.4.0
const ( const (
dbVersion = 8 dbVersion = 9
dbMinSyncthingVersion = "v1.4.0" dbMinSyncthingVersion = "v1.4.0"
) )
var (
errFolderIdxMissing = fmt.Errorf("folder db index missing")
errDeviceIdxMissing = fmt.Errorf("device db index missing")
)
type databaseDowngradeError struct { type databaseDowngradeError struct {
minSyncthingVersion string minSyncthingVersion string
} }
@ -80,7 +88,7 @@ func (db *schemaUpdater) updateSchema() error {
{5, db.updateSchemaTo5}, {5, db.updateSchemaTo5},
{6, db.updateSchema5to6}, {6, db.updateSchema5to6},
{7, db.updateSchema6to7}, {7, db.updateSchema6to7},
{8, db.updateSchema7to8}, {9, db.updateSchemato9},
} }
for _, m := range migrations { for _, m := range migrations {
@ -421,8 +429,9 @@ func (db *schemaUpdater) updateSchema6to7(_ int) error {
return t.Commit() return t.Commit()
} }
func (db *schemaUpdater) updateSchema7to8(_ int) error { func (db *schemaUpdater) updateSchemato9(prev int) error {
// Loads and rewrites all files with blocks, to deduplicate block lists. // Loads and rewrites all files with blocks, to deduplicate block lists.
// Checks for missing or incorrect sequence entries and rewrites those.
t, err := db.newReadWriteTransaction() t, err := db.newReadWriteTransaction()
if err != nil { if err != nil {
@ -430,15 +439,59 @@ func (db *schemaUpdater) updateSchema7to8(_ int) error {
} }
defer t.close() defer t.close()
var sk []byte
it, err := t.NewPrefixIterator([]byte{KeyTypeDevice}) it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
if err != nil { if err != nil {
return err return err
} }
metas := make(map[string]*metadataTracker)
for it.Next() { for it.Next() {
var fi protocol.FileInfo var fi protocol.FileInfo
if err := fi.Unmarshal(it.Value()); err != nil { if err := fi.Unmarshal(it.Value()); err != nil {
return err return err
} }
device, ok := t.keyer.DeviceFromDeviceFileKey(it.Key())
if !ok {
return errDeviceIdxMissing
}
if bytes.Equal(device, protocol.LocalDeviceID[:]) {
folder, ok := t.keyer.FolderFromDeviceFileKey(it.Key())
if !ok {
return errFolderIdxMissing
}
if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
return err
}
switch dk, err := t.Get(sk); {
case err != nil:
if !backend.IsNotFound(err) {
return err
}
fallthrough
case !bytes.Equal(it.Key(), dk):
folderStr := string(folder)
meta, ok := metas[folderStr]
if !ok {
meta = loadMetadataTracker(db.Lowlevel, folderStr)
metas[folderStr] = meta
}
fi.Sequence = meta.nextLocalSeq()
if sk, err = t.keyer.GenerateSequenceKey(sk, folder, fi.Sequence); err != nil {
return err
}
if err := t.Put(sk, it.Key()); err != nil {
return err
}
if err := t.putFile(it.Key(), fi); err != nil {
return err
}
continue
}
}
if prev == 8 {
// The transition to 8 already did the changes below.
continue
}
if fi.Blocks == nil { if fi.Blocks == nil {
continue continue
} }
@ -451,6 +504,12 @@ func (db *schemaUpdater) updateSchema7to8(_ int) error {
return err return err
} }
for folder, meta := range metas {
if err := meta.toDB(t, []byte(folder)); err != nil {
return err
}
}
db.recordTime(blockGCTimeKey) db.recordTime(blockGCTimeKey)
return t.Commit() return t.Commit()

View File

@ -71,66 +71,68 @@ func init() {
} }
func NewFileSet(folder string, fs fs.Filesystem, db *Lowlevel) *FileSet { func NewFileSet(folder string, fs fs.Filesystem, db *Lowlevel) *FileSet {
var s = &FileSet{ return &FileSet{
folder: folder, folder: folder,
fs: fs, fs: fs,
db: db, db: db,
meta: newMetadataTracker(), meta: loadMetadataTracker(db, folder),
updateMutex: sync.NewMutex(), updateMutex: sync.NewMutex(),
} }
}
recalc := func() *FileSet { func loadMetadataTracker(db *Lowlevel, folder string) *metadataTracker {
if err := s.recalcMeta(); backend.IsClosed(err) { meta := newMetadataTracker()
recalc := func() *metadataTracker {
if err := recalcMeta(meta, db, folder); backend.IsClosed(err) {
return nil return nil
} else if err != nil { } else if err != nil {
panic(err) panic(err)
} }
return s return meta
} }
if err := s.meta.fromDB(db, []byte(folder)); err != nil { if err := meta.fromDB(db, []byte(folder)); err != nil {
l.Infof("No stored folder metadata for %q; recalculating", folder) l.Infof("No stored folder metadata for %q; recalculating", folder)
return recalc() return recalc()
} }
if metaOK := s.verifyLocalSequence(); !metaOK { if metaOK := verifyLocalSequence(meta, db, folder); !metaOK {
l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder) l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
return recalc() return recalc()
} }
if age := time.Since(s.meta.Created()); age > databaseRecheckInterval { if age := time.Since(meta.Created()); age > databaseRecheckInterval {
l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, age) l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, age)
return recalc() return recalc()
} }
return s return meta
} }
func (s *FileSet) recalcMeta() error { func recalcMeta(meta *metadataTracker, db *Lowlevel, folder string) error {
s.meta = newMetadataTracker() if err := db.checkGlobals([]byte(folder), meta); err != nil {
if err := s.db.checkGlobals([]byte(s.folder), s.meta); err != nil {
return err return err
} }
t, err := s.db.newReadWriteTransaction() t, err := db.newReadWriteTransaction()
if err != nil { if err != nil {
return err return err
} }
defer t.close() defer t.close()
var deviceID protocol.DeviceID var deviceID protocol.DeviceID
err = t.withAllFolderTruncated([]byte(s.folder), func(device []byte, f FileInfoTruncated) bool { err = t.withAllFolderTruncated([]byte(folder), func(device []byte, f FileInfoTruncated) bool {
copy(deviceID[:], device) copy(deviceID[:], device)
s.meta.addFile(deviceID, f) meta.addFile(deviceID, f)
return true return true
}) })
if err != nil { if err != nil {
return err return err
} }
s.meta.SetCreated() meta.SetCreated()
if err := s.meta.toDB(t, []byte(s.folder)); err != nil { if err := meta.toDB(t, []byte(folder)); err != nil {
return err return err
} }
return t.Commit() return t.Commit()
@ -138,7 +140,7 @@ func (s *FileSet) recalcMeta() error {
// Verify the local sequence number from actual sequence entries. Returns // Verify the local sequence number from actual sequence entries. Returns
// true if it was all good, or false if a fixup was necessary. // true if it was all good, or false if a fixup was necessary.
func (s *FileSet) verifyLocalSequence() bool { func verifyLocalSequence(meta *metadataTracker, db *Lowlevel, folder string) bool {
// Walk the sequence index from the current (supposedly) highest // Walk the sequence index from the current (supposedly) highest
// sequence number and raise the alarm if we get anything. This recovers // sequence number and raise the alarm if we get anything. This recovers
// from the occasion where we have written sequence entries to disk but // from the occasion where we have written sequence entries to disk but
@ -149,15 +151,20 @@ func (s *FileSet) verifyLocalSequence() bool {
// number than we've actually seen and receive some duplicate updates // number than we've actually seen and receive some duplicate updates
// and then be in sync again. // and then be in sync again.
curSeq := s.meta.Sequence(protocol.LocalDeviceID) curSeq := meta.Sequence(protocol.LocalDeviceID)
snap := s.Snapshot() t, err := db.newReadOnlyTransaction()
if err != nil {
panic(err)
}
ok := true ok := true
snap.WithHaveSequence(curSeq+1, func(fi FileIntf) bool { if err := t.withHaveSequence([]byte(folder), curSeq+1, func(fi FileIntf) bool {
ok = false // we got something, which we should not have ok = false // we got something, which we should not have
return false return false
}) }); err != nil && !backend.IsClosed(err) {
snap.Release() panic(err)
}
t.close()
return ok return ok
} }