lib/db, lib/model: Allow needing invalid files (fixes #7474) (#7476)

This commit is contained in:
Simon Frei 2021-03-15 07:58:01 +01:00 committed by GitHub
parent bb886868d2
commit 273ee09925
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 125 additions and 46 deletions

View File

@ -846,6 +846,10 @@ func (db *Lowlevel) getMetaAndCheck(folder string) (*metadataTracker, error) {
db.gcMut.RLock()
defer db.gcMut.RUnlock()
return db.getMetaAndCheckGCLocked(folder)
}
func (db *Lowlevel) getMetaAndCheckGCLocked(folder string) (*metadataTracker, error) {
fixed, err := db.checkLocalNeed([]byte(folder))
if err != nil {
return nil, fmt.Errorf("checking local need: %w", err)

View File

@ -21,7 +21,7 @@ import (
// do not put restrictions on downgrades (e.g. for repairs after a bugfix).
const (
dbVersion = 14
dbMigrationVersion = 15
dbMigrationVersion = 16
dbMinSyncthingVersion = "v1.9.0"
)
@ -104,6 +104,7 @@ func (db *schemaUpdater) updateSchema() error {
{13, 13, "v1.7.0", db.updateSchemaTo13},
{14, 14, "v1.9.0", db.updateSchemaTo14},
{14, 15, "v1.9.0", db.migration15},
{14, 16, "v1.9.0", db.checkRepairMigration},
}
for _, m := range migrations {
@ -782,6 +783,16 @@ func (db *schemaUpdater) migration15(_ int) error {
return nil
}
func (db *schemaUpdater) checkRepairMigration(_ int) error {
for _, folder := range db.ListFolders() {
_, err := db.getMetaAndCheckGCLocked(folder)
if err != nil {
return err
}
}
return nil
}
func (db *schemaUpdater) rewriteGlobals(t readWriteTransaction) error {
it, err := t.NewPrefixIterator([]byte{KeyTypeGlobal})
if err != nil {

View File

@ -469,9 +469,10 @@ func TestNeedWithInvalid(t *testing.T) {
}
expectedNeed := fileList{
protocol.FileInfo{Name: "b", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1001}}}, Blocks: genBlocks(2)},
protocol.FileInfo{Name: "c", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1002}}}, Blocks: genBlocks(7)},
protocol.FileInfo{Name: "d", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1003}}}, Blocks: genBlocks(7)},
remote0Have[0],
remote1Have[0],
remote0Have[2],
remote1Have[2],
}
replace(s, protocol.LocalDeviceID, localHave)

View File

@ -762,10 +762,8 @@ func (t readWriteTransaction) updateLocalNeed(keyBuf, folder, name []byte, add b
}
func Need(global FileVersion, haveLocal bool, localVersion protocol.Vector) bool {
// We never need an invalid file or a file without a valid version (just
// another way of expressing "invalid", really, until we fix that
// part...).
if global.IsInvalid() || global.Version.IsEmpty() {
// We never need a file without a valid version.
if global.Version.IsEmpty() {
return false
}
// We don't need a deleted file if we don't have it.

View File

@ -62,33 +62,34 @@ func (f *fakeConnection) DownloadProgress(_ context.Context, folder string, upda
})
}
func (f *fakeConnection) addFileLocked(name string, flags uint32, ftype protocol.FileInfoType, data []byte, version protocol.Vector) {
func (f *fakeConnection) addFileLocked(name string, flags uint32, ftype protocol.FileInfoType, data []byte, version protocol.Vector, localFlags uint32) {
blockSize := protocol.BlockSize(int64(len(data)))
blocks, _ := scanner.Blocks(context.TODO(), bytes.NewReader(data), blockSize, int64(len(data)), nil, true)
if ftype == protocol.FileInfoTypeFile || ftype == protocol.FileInfoTypeDirectory {
f.files = append(f.files, protocol.FileInfo{
Name: name,
Type: ftype,
Size: int64(len(data)),
ModifiedS: time.Now().Unix(),
Permissions: flags,
Version: version,
Sequence: time.Now().UnixNano(),
RawBlockSize: blockSize,
Blocks: blocks,
})
} else {
// Symlink
f.files = append(f.files, protocol.FileInfo{
file := protocol.FileInfo{
Name: name,
Type: ftype,
Version: version,
Sequence: time.Now().UnixNano(),
SymlinkTarget: string(data),
NoPermissions: true,
})
LocalFlags: localFlags,
}
switch ftype {
case protocol.FileInfoTypeFile, protocol.FileInfoTypeDirectory:
file.ModifiedS = time.Now().Unix()
file.Permissions = flags
if ftype == protocol.FileInfoTypeFile {
file.Size = int64(len(data))
file.RawBlockSize = blockSize
file.Blocks = blocks
}
default: // Symlink
file.Name = name
file.Type = ftype
file.Version = version
file.SymlinkTarget = string(data)
file.NoPermissions = true
}
f.files = append(f.files, file)
if f.fileData == nil {
f.fileData = make(map[string][]byte)
@ -96,13 +97,22 @@ func (f *fakeConnection) addFileLocked(name string, flags uint32, ftype protocol
f.fileData[name] = data
}
func (f *fakeConnection) addFileWithLocalFlags(name string, ftype protocol.FileInfoType, localFlags uint32) {
f.mut.Lock()
defer f.mut.Unlock()
var version protocol.Vector
version = version.Update(f.id.Short())
f.addFileLocked(name, 0, ftype, nil, version, localFlags)
}
func (f *fakeConnection) addFile(name string, flags uint32, ftype protocol.FileInfoType, data []byte) {
f.mut.Lock()
defer f.mut.Unlock()
var version protocol.Vector
version = version.Update(f.id.Short())
f.addFileLocked(name, flags, ftype, data, version)
f.addFileLocked(name, flags, ftype, data, version, 0)
}
func (f *fakeConnection) updateFile(name string, flags uint32, ftype protocol.FileInfoType, data []byte) {
@ -112,7 +122,7 @@ func (f *fakeConnection) updateFile(name string, flags uint32, ftype protocol.Fi
for i, fi := range f.files {
if fi.Name == name {
f.files = append(f.files[:i], f.files[i+1:]...)
f.addFileLocked(name, flags, ftype, data, fi.Version.Update(f.id.Short()))
f.addFileLocked(name, flags, ftype, data, fi.Version.Update(f.id.Short()), 0)
return
}
}
@ -137,7 +147,11 @@ func (f *fakeConnection) deleteFile(name string) {
}
func (f *fakeConnection) sendIndexUpdate() {
f.model.IndexUpdate(f.id, f.folder, f.files)
toSend := make([]protocol.FileInfo, len(f.files))
for i := range f.files {
toSend[i] = prepareFileInfoForIndex(f.files[i])
}
f.model.IndexUpdate(f.id, f.folder, toSend)
}
func addFakeConn(m *testModel, dev protocol.DeviceID) *fakeConnection {

View File

@ -357,6 +357,11 @@ func (f *sendReceiveFolder) processNeeded(snap *db.Snapshot, dbUpdateChan chan<-
changed--
}
case file.IsInvalid():
// Global invalid file just exists for need accounting
l.Debugln(f, "Handling global invalid item", file)
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
case file.IsDeleted():
if file.IsDirectory() {
// Perform directory deletions at the end, as we may have

View File

@ -178,18 +178,7 @@ func (s *indexSender) sendIndexTo(ctx context.Context) error {
return true
}
// Mark the file as invalid if any of the local bad stuff flags are set.
f.RawInvalid = f.IsInvalid()
// If the file is marked LocalReceive (i.e., changed locally on a
// receive only folder) we do not want it to ever become the
// globally best version, invalid or not.
if f.IsReceiveOnlyChanged() {
f.Version = protocol.Vector{}
}
// never sent externally
f.LocalFlags = 0
f.VersionHash = nil
f = prepareFileInfoForIndex(f)
previousWasDelete = f.IsDeleted()
@ -211,6 +200,21 @@ func (s *indexSender) sendIndexTo(ctx context.Context) error {
return err
}
func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo {
// Mark the file as invalid if any of the local bad stuff flags are set.
f.RawInvalid = f.IsInvalid()
// If the file is marked LocalReceive (i.e., changed locally on a
// receive only folder) we do not want it to ever become the
// globally best version, invalid or not.
if f.IsReceiveOnlyChanged() {
f.Version = protocol.Vector{}
}
// never sent externally
f.LocalFlags = 0
f.VersionHash = nil
return f
}
func (s *indexSender) String() string {
return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.conn.ID(), s.conn)
}

View File

@ -1399,3 +1399,45 @@ func TestRequestReceiveEncryptedLocalNoSend(t *testing.T) {
t.Fatal("timed out before receiving index")
}
}
func TestRequestIssue7474(t *testing.T) {
// Repro for https://github.com/syncthing/syncthing/issues/7474
// Devices A, B and C. B connected to A and C, but not A to C.
// A has valid file, B ignores it.
// In the test C is local, and B is the fake connection.
done := make(chan struct{})
defer close(done)
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
tfs := fcfg.Filesystem()
defer cleanupModelAndRemoveDir(m, tfs.URI())
indexChan := make(chan []protocol.FileInfo)
fc.setIndexFn(func(ctx context.Context, folder string, fs []protocol.FileInfo) error {
select {
case indexChan <- fs:
case <-done:
case <-ctx.Done():
}
return nil
})
name := "foo"
fc.addFileWithLocalFlags(name, protocol.FileInfoTypeFile, protocol.FlagLocalIgnored)
fc.sendIndexUpdate()
select {
case <-time.After(10 * time.Second):
t.Fatal("timed out before receiving index")
case fs := <-indexChan:
if len(fs) != 1 {
t.Fatalf("Expected one file in index, got %v", len(fs))
}
if !fs[0].IsInvalid() {
t.Error("Expected invalid file")
}
}
}