lib: Handle metadata changes for send-only folders (fixes #4616, fixes #4627) (#4750)

Unignored files are marked as conflicting while scanning, which is then resolved
in the subsequent pull. Automatically reconciles needed items on send-only
folders, if they do not actually differ except for internal metadata.
This commit is contained in:
Simon Frei 2018-02-25 09:39:00 +01:00 committed by Jakob Borg
parent 5822222c74
commit 158859a1e2
10 changed files with 425 additions and 241 deletions

View File

@ -324,7 +324,7 @@ func (db *Instance) availability(folder, file []byte) []protocol.DeviceID {
return devices return devices
} }
func (db *Instance) withNeed(folder, device []byte, truncate bool, needAllInvalid bool, fn Iterator) { func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
t := db.newReadOnlyTransaction() t := db.newReadOnlyTransaction()
defer t.close() defer t.close()
@ -351,12 +351,6 @@ func (db *Instance) withNeed(folder, device []byte, truncate bool, needAllInvali
if bytes.Equal(v.Device, device) { if bytes.Equal(v.Device, device) {
have = true have = true
haveFileVersion = v haveFileVersion = v
// We need invalid files regardless of version when
// ignore patterns changed
if v.Invalid && needAllInvalid {
need = true
break
}
// XXX: This marks Concurrent (i.e. conflicting) changes as // XXX: This marks Concurrent (i.e. conflicting) changes as
// needs. Maybe we should do that, but it needs special // needs. Maybe we should do that, but it needs special
// handling in the puller. // handling in the puller.

View File

@ -164,24 +164,12 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
func (s *FileSet) WithNeed(device protocol.DeviceID, fn Iterator) { func (s *FileSet) WithNeed(device protocol.DeviceID, fn Iterator) {
l.Debugf("%s WithNeed(%v)", s.folder, device) l.Debugf("%s WithNeed(%v)", s.folder, device)
s.db.withNeed([]byte(s.folder), device[:], false, false, nativeFileIterator(fn)) s.db.withNeed([]byte(s.folder), device[:], false, nativeFileIterator(fn))
} }
func (s *FileSet) WithNeedTruncated(device protocol.DeviceID, fn Iterator) { func (s *FileSet) WithNeedTruncated(device protocol.DeviceID, fn Iterator) {
l.Debugf("%s WithNeedTruncated(%v)", s.folder, device) l.Debugf("%s WithNeedTruncated(%v)", s.folder, device)
s.db.withNeed([]byte(s.folder), device[:], true, false, nativeFileIterator(fn)) s.db.withNeed([]byte(s.folder), device[:], true, nativeFileIterator(fn))
}
// WithNeedOrInvalid considers all invalid files as needed, regardless of their version
// (e.g. for pulling when ignore patterns changed)
func (s *FileSet) WithNeedOrInvalid(device protocol.DeviceID, fn Iterator) {
l.Debugf("%s WithNeedExcludingInvalid(%v)", s.folder, device)
s.db.withNeed([]byte(s.folder), device[:], false, true, nativeFileIterator(fn))
}
func (s *FileSet) WithNeedOrInvalidTruncated(device protocol.DeviceID, fn Iterator) {
l.Debugf("%s WithNeedExcludingInvalidTruncated(%v)", s.folder, device)
s.db.withNeed([]byte(s.folder), device[:], true, true, nativeFileIterator(fn))
} }
func (s *FileSet) WithHave(device protocol.DeviceID, fn Iterator) { func (s *FileSet) WithHave(device protocol.DeviceID, fn Iterator) {

View File

@ -13,6 +13,7 @@ import (
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/watchaggregator" "github.com/syncthing/syncthing/lib/watchaggregator"
) )
@ -23,16 +24,21 @@ type folder struct {
stateTracker stateTracker
config.FolderConfiguration config.FolderConfiguration
model *Model
shortID protocol.ShortID
ctx context.Context
cancel context.CancelFunc
scan folderScanner scan folderScanner
model *Model
ctx context.Context
cancel context.CancelFunc
initialScanFinished chan struct{} initialScanFinished chan struct{}
watchCancel context.CancelFunc
watchChan chan []string pullScheduled chan struct{}
restartWatchChan chan struct{}
watchErr error watchCancel context.CancelFunc
watchErrMut sync.Mutex watchChan chan []string
restartWatchChan chan struct{}
watchErr error
watchErrMut sync.Mutex
} }
func newFolder(model *Model, cfg config.FolderConfiguration) folder { func newFolder(model *Model, cfg config.FolderConfiguration) folder {
@ -42,14 +48,19 @@ func newFolder(model *Model, cfg config.FolderConfiguration) folder {
stateTracker: newStateTracker(cfg.ID), stateTracker: newStateTracker(cfg.ID),
FolderConfiguration: cfg, FolderConfiguration: cfg,
model: model,
shortID: model.shortID,
ctx: ctx,
cancel: cancel,
scan: newFolderScanner(cfg), scan: newFolderScanner(cfg),
ctx: ctx,
cancel: cancel,
model: model,
initialScanFinished: make(chan struct{}), initialScanFinished: make(chan struct{}),
watchCancel: func() {},
watchErr: errWatchNotStarted, pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
watchErrMut: sync.NewMutex(),
watchCancel: func() {},
watchErr: errWatchNotStarted,
watchErrMut: sync.NewMutex(),
} }
} }
@ -65,7 +76,16 @@ func (f *folder) IgnoresUpdated() {
} }
} }
func (f *folder) SchedulePull() {} func (f *folder) SchedulePull() {
select {
case f.pullScheduled <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull, but beyond that we must
// make sure to not block index receiving.
}
}
func (f *folder) Jobs() ([]string, []string) { func (f *folder) Jobs() ([]string, []string) {
return nil, nil return nil, nil

View File

@ -26,7 +26,6 @@ import (
"github.com/d4l3k/messagediff" "github.com/d4l3k/messagediff"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/osutil"
@ -50,6 +49,7 @@ func init() {
defaultFolderConfig = config.NewFolderConfiguration(protocol.LocalDeviceID, "default", "default", fs.FilesystemTypeBasic, "testdata") defaultFolderConfig = config.NewFolderConfiguration(protocol.LocalDeviceID, "default", "default", fs.FilesystemTypeBasic, "testdata")
defaultFolderConfig.Devices = []config.FolderDeviceConfiguration{{DeviceID: device1}} defaultFolderConfig.Devices = []config.FolderDeviceConfiguration{{DeviceID: device1}}
_defaultConfig := config.Configuration{ _defaultConfig := config.Configuration{
Version: config.CurrentVersion,
Folders: []config.FolderConfiguration{defaultFolderConfig}, Folders: []config.FolderConfiguration{defaultFolderConfig},
Devices: []config.DeviceConfiguration{config.NewDeviceConfiguration(device1, "device1")}, Devices: []config.DeviceConfiguration{config.NewDeviceConfiguration(device1, "device1")},
Options: config.OptionsConfiguration{ Options: config.OptionsConfiguration{
@ -3442,86 +3442,6 @@ func TestPausedFolders(t *testing.T) {
} }
} }
func TestPullInvalid(t *testing.T) {
if runtime.GOOS != "windows" {
t.Skip("Windows only")
}
tmpDir, err := ioutil.TempDir(".", "_model-")
if err != nil {
panic("Failed to create temporary testing dir")
}
defer os.RemoveAll(tmpDir)
cfg := defaultConfig.RawCopy()
cfg.Folders[0] = config.NewFolderConfiguration(protocol.LocalDeviceID, "default", "default", fs.FilesystemTypeBasic, tmpDir)
cfg.Folders[0].Devices = []config.FolderDeviceConfiguration{{DeviceID: device1}}
w := config.Wrap("/tmp/cfg", cfg)
db := db.OpenMemory()
m := NewModel(w, protocol.LocalDeviceID, "syncthing", "dev", db, nil)
m.AddFolder(cfg.Folders[0])
m.StartFolder("default")
m.ServeBackground()
defer m.Stop()
m.ScanFolder("default")
if err := m.SetIgnores("default", []string{"*:ignored"}); err != nil {
panic(err)
}
ign := "invalid:ignored"
del := "invalid:deleted"
var version protocol.Vector
version = version.Update(device1.Short())
m.IndexUpdate(device1, "default", []protocol.FileInfo{
{
Name: ign,
Size: 1234,
Type: protocol.FileInfoTypeFile,
Version: version,
},
{
Name: del,
Size: 1234,
Type: protocol.FileInfoTypeFile,
Version: version,
Deleted: true,
},
})
sub := events.Default.Subscribe(events.FolderErrors)
defer events.Default.Unsubscribe(sub)
timeout := time.NewTimer(5 * time.Second)
for {
select {
case ev := <-sub.C():
t.Fatalf("Errors while pulling: %v", ev)
case <-timeout.C:
t.Fatalf("File wasn't added to index until timeout")
default:
}
file, ok := m.CurrentFolderFile("default", ign)
if !ok {
time.Sleep(100 * time.Millisecond)
continue
}
if !file.Invalid {
t.Error("Ignored file isn't marked as invalid")
}
if file, ok = m.CurrentFolderFile("default", del); ok {
t.Error("Deleted invalid file was added to index")
}
return
}
}
func addFakeConn(m *Model, dev protocol.DeviceID) *fakeConnection { func addFakeConn(m *Model, dev protocol.DeviceID) *fakeConnection {
fc := &fakeConnection{id: dev, model: m} fc := &fakeConnection{id: dev, model: m}
m.AddConnection(fc, protocol.HelloResult{}) m.AddConnection(fc, protocol.HelloResult{})

View File

@ -8,6 +8,7 @@ package model
import ( import (
"bytes" "bytes"
"errors"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -18,7 +19,9 @@ import (
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
) )
@ -26,9 +29,9 @@ func TestRequestSimple(t *testing.T) {
// Verify that the model performs a request and creates a file based on // Verify that the model performs a request and creates a file based on
// an incoming index update. // an incoming index update.
m, fc, tmpFolder := setupModelWithConnection() m, fc, tmpDir := setupModelWithConnection()
defer m.Stop() defer m.Stop()
defer os.RemoveAll(tmpFolder) defer os.RemoveAll(tmpDir)
// We listen for incoming index updates and trigger when we see one for // We listen for incoming index updates and trigger when we see one for
// the expected test file. // the expected test file.
@ -51,13 +54,8 @@ func TestRequestSimple(t *testing.T) {
<-done <-done
// Verify the contents // Verify the contents
bs, err := ioutil.ReadFile(filepath.Join(tmpFolder, "testfile")) if err := equalContents(filepath.Join(tmpDir, "testfile"), contents); err != nil {
if err != nil {
t.Error("File did not sync correctly:", err) t.Error("File did not sync correctly:", err)
return
}
if !bytes.Equal(bs, contents) {
t.Error("File did not sync correctly: incorrect data")
} }
} }
@ -69,9 +67,9 @@ func TestSymlinkTraversalRead(t *testing.T) {
return return
} }
m, fc, tmpFolder := setupModelWithConnection() m, fc, tmpDir := setupModelWithConnection()
defer m.Stop() defer m.Stop()
defer os.RemoveAll(tmpFolder) defer os.RemoveAll(tmpDir)
// We listen for incoming index updates and trigger when we see one for // We listen for incoming index updates and trigger when we see one for
// the expected test file. // the expected test file.
@ -109,9 +107,9 @@ func TestSymlinkTraversalWrite(t *testing.T) {
return return
} }
m, fc, tmpFolder := setupModelWithConnection() m, fc, tmpDir := setupModelWithConnection()
defer m.Stop() defer m.Stop()
defer os.RemoveAll(tmpFolder) defer os.RemoveAll(tmpDir)
// We listen for incoming index updates and trigger when we see one for // We listen for incoming index updates and trigger when we see one for
// the expected names. // the expected names.
@ -169,9 +167,9 @@ func TestSymlinkTraversalWrite(t *testing.T) {
func TestRequestCreateTmpSymlink(t *testing.T) { func TestRequestCreateTmpSymlink(t *testing.T) {
// Test that an update for a temporary file is invalidated // Test that an update for a temporary file is invalidated
m, fc, tmpFolder := setupModelWithConnection() m, fc, tmpDir := setupModelWithConnection()
defer m.Stop() defer m.Stop()
defer os.RemoveAll(tmpFolder) defer os.RemoveAll(tmpDir)
// We listen for incoming index updates and trigger when we see one for // We listen for incoming index updates and trigger when we see one for
// the expected test file. // the expected test file.
@ -211,12 +209,12 @@ func TestRequestVersioningSymlinkAttack(t *testing.T) {
// Sets up a folder with trashcan versioning and tries to use a // Sets up a folder with trashcan versioning and tries to use a
// deleted symlink to escape // deleted symlink to escape
tmpFolder, err := ioutil.TempDir(".", "_request-") tmpDir, err := ioutil.TempDir(".", "_request-")
if err != nil { if err != nil {
panic("Failed to create temporary testing dir") panic("Failed to create temporary testing dir")
} }
cfg := defaultConfig.RawCopy() cfg := defaultConfig.RawCopy()
cfg.Folders[0] = config.NewFolderConfiguration(protocol.LocalDeviceID, "default", "default", fs.FilesystemTypeBasic, tmpFolder) cfg.Folders[0] = config.NewFolderConfiguration(protocol.LocalDeviceID, "default", "default", fs.FilesystemTypeBasic, tmpDir)
cfg.Folders[0].Devices = []config.FolderDeviceConfiguration{ cfg.Folders[0].Devices = []config.FolderDeviceConfiguration{
{DeviceID: device1}, {DeviceID: device1},
{DeviceID: device2}, {DeviceID: device2},
@ -233,7 +231,7 @@ func TestRequestVersioningSymlinkAttack(t *testing.T) {
m.StartFolder("default") m.StartFolder("default")
defer m.Stop() defer m.Stop()
defer os.RemoveAll(tmpFolder) defer os.RemoveAll(tmpDir)
fc := addFakeConn(m, device2) fc := addFakeConn(m, device2)
fc.folder = "default" fc.folder = "default"
@ -286,17 +284,161 @@ func TestRequestVersioningSymlinkAttack(t *testing.T) {
} }
} }
func setupModelWithConnection() (*Model, *fakeConnection, string) { func TestPullInvalidIgnoredSO(t *testing.T) {
tmpFolder, err := ioutil.TempDir(".", "_request-") pullInvalidIgnored(t, config.FolderTypeSendOnly)
if err != nil {
panic("Failed to create temporary testing dir") }
}
func TestPullInvalidIgnoredSR(t *testing.T) {
pullInvalidIgnored(t, config.FolderTypeSendReceive)
}
// This test checks that (un-)ignored/invalid/deleted files are treated as expected.
func pullInvalidIgnored(t *testing.T, ft config.FolderType) {
t.Helper()
tmpDir := createTmpDir()
defer os.RemoveAll(tmpDir)
cfg := defaultConfig.RawCopy() cfg := defaultConfig.RawCopy()
cfg.Folders[0] = config.NewFolderConfiguration(protocol.LocalDeviceID, "default", "default", fs.FilesystemTypeBasic, tmpFolder) cfg.Devices = append(cfg.Devices, config.NewDeviceConfiguration(device2, "device2"))
cfg.Folders[0] = config.NewFolderConfiguration(protocol.LocalDeviceID, "default", "default", fs.FilesystemTypeBasic, tmpDir)
cfg.Folders[0].Devices = []config.FolderDeviceConfiguration{ cfg.Folders[0].Devices = []config.FolderDeviceConfiguration{
{DeviceID: device1}, {DeviceID: device1},
{DeviceID: device2}, {DeviceID: device2},
} }
cfg.Folders[0].Type = ft
m, fc := setupModelWithConnectionManual(cfg)
defer m.Stop()
// Reach in and update the ignore matcher to one that always does
// reloads when asked to, instead of checking file mtimes. This is
// because we might be changing the files on disk often enough that the
// mtimes will be unreliable to determine change status.
m.fmut.Lock()
m.folderIgnores["default"] = ignore.New(cfg.Folders[0].Filesystem(), ignore.WithChangeDetector(newAlwaysChanged()))
m.fmut.Unlock()
if err := m.SetIgnores("default", []string{"*ignored*"}); err != nil {
panic(err)
}
contents := []byte("test file contents\n")
otherContents := []byte("other test file contents\n")
invIgn := "invalid:ignored"
invDel := "invalid:deleted"
ign := "ignoredNonExisting"
ignExisting := "ignoredExisting"
fc.addFile(invIgn, 0644, protocol.FileInfoTypeFile, contents)
fc.addFile(invDel, 0644, protocol.FileInfoTypeFile, contents)
fc.deleteFile(invDel)
fc.addFile(ign, 0644, protocol.FileInfoTypeFile, contents)
fc.addFile(ignExisting, 0644, protocol.FileInfoTypeFile, contents)
if err := ioutil.WriteFile(filepath.Join(tmpDir, ignExisting), otherContents, 0644); err != nil {
panic(err)
}
done := make(chan struct{})
fc.mut.Lock()
fc.indexFn = func(folder string, fs []protocol.FileInfo) {
expected := map[string]struct{}{invIgn: {}, ign: {}, ignExisting: {}}
for _, f := range fs {
if _, ok := expected[f.Name]; !ok {
t.Fatalf("Unexpected file %v was added to index", f.Name)
}
if !f.Invalid {
t.Errorf("File %v wasn't marked as invalid", f.Name)
}
delete(expected, f.Name)
}
for name := range expected {
t.Errorf("File %v wasn't added to index", name)
}
done <- struct{}{}
}
fc.mut.Unlock()
sub := events.Default.Subscribe(events.FolderErrors)
defer events.Default.Unsubscribe(sub)
fc.sendIndexUpdate()
timeout := time.NewTimer(5 * time.Second)
select {
case ev := <-sub.C():
t.Fatalf("Errors while pulling: %v", ev)
case <-timeout.C:
t.Fatalf("timed out before index was received")
case <-done:
return
}
fc.mut.Lock()
fc.indexFn = func(folder string, fs []protocol.FileInfo) {
expected := map[string]struct{}{ign: {}, ignExisting: {}}
for _, f := range fs {
if _, ok := expected[f.Name]; !ok {
t.Fatalf("Unexpected file %v was updated in index", f.Name)
}
if f.Invalid {
t.Errorf("File %v is still marked as invalid", f.Name)
}
// The unignored files should only have a local version,
// to mark them as in conflict with any other existing versions.
ev := protocol.Vector{}.Update(device1.Short())
if v := f.Version; !v.Equal(ev) {
t.Errorf("File %v has version %v, expected %v", f.Name, v, ev)
}
if f.Name == ign {
if !f.Deleted {
t.Errorf("File %v was not marked as deleted", f.Name)
}
} else if f.Deleted {
t.Errorf("File %v is marked as deleted", f.Name)
}
delete(expected, f.Name)
}
for name := range expected {
t.Errorf("File %v wasn't updated in index", name)
}
done <- struct{}{}
}
// Make sure pulling doesn't interfere, as index updates are racy and
// thus we cannot distinguish between scan and pull results.
fc.requestFn = func(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
return nil, nil
}
fc.mut.Unlock()
if err := m.SetIgnores("default", []string{"*:ignored*"}); err != nil {
panic(err)
}
timeout = time.NewTimer(5 * time.Second)
select {
case <-timeout.C:
t.Fatalf("timed out before index was received")
case <-done:
return
}
}
func setupModelWithConnection() (*Model, *fakeConnection, string) {
tmpDir := createTmpDir()
cfg := defaultConfig.RawCopy()
cfg.Devices = append(cfg.Devices, config.NewDeviceConfiguration(device2, "device2"))
cfg.Folders[0] = config.NewFolderConfiguration(protocol.LocalDeviceID, "default", "default", fs.FilesystemTypeBasic, tmpDir)
cfg.Folders[0].Devices = []config.FolderDeviceConfiguration{
{DeviceID: device1},
{DeviceID: device2},
}
m, fc := setupModelWithConnectionManual(cfg)
return m, fc, tmpDir
}
func setupModelWithConnectionManual(cfg config.Configuration) (*Model, *fakeConnection) {
w := config.Wrap("/tmp/cfg", cfg) w := config.Wrap("/tmp/cfg", cfg)
db := db.OpenMemory() db := db.OpenMemory()
@ -308,5 +450,24 @@ func setupModelWithConnection() (*Model, *fakeConnection, string) {
fc := addFakeConn(m, device2) fc := addFakeConn(m, device2)
fc.folder = "default" fc.folder = "default"
return m, fc, tmpFolder m.ScanFolder("default")
return m, fc
}
func createTmpDir() string {
tmpDir, err := ioutil.TempDir(".", "_request-")
if err != nil {
panic("Failed to create temporary testing dir")
}
return tmpDir
}
func equalContents(path string, contents []byte) error {
if bs, err := ioutil.ReadFile(path); err != nil {
return err
} else if !bytes.Equal(bs, contents) {
return errors.New("incorrect data")
}
return nil
} }

View File

@ -10,7 +10,9 @@ import (
"fmt" "fmt"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/versioner" "github.com/syncthing/syncthing/lib/versioner"
) )
@ -43,6 +45,9 @@ func (f *sendOnlyFolder) Serve() {
case <-f.ctx.Done(): case <-f.ctx.Done():
return return
case <-f.pullScheduled:
f.pull()
case <-f.restartWatchChan: case <-f.restartWatchChan:
f.restartWatch() f.restartWatch()
@ -70,3 +75,62 @@ func (f *sendOnlyFolder) String() string {
func (f *sendOnlyFolder) PullErrors() []FileError { func (f *sendOnlyFolder) PullErrors() []FileError {
return nil return nil
} }
// pull checks need for files that only differ by metadata (no changes on disk)
func (f *sendOnlyFolder) pull() {
select {
case <-f.initialScanFinished:
default:
// Once the initial scan finished, a pull will be scheduled
return
}
f.model.fmut.RLock()
folderFiles := f.model.folderFiles[f.folderID]
ignores := f.model.folderIgnores[f.folderID]
f.model.fmut.RUnlock()
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
batchSizeBytes := 0
folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
f.model.updateLocalsFromPulling(f.folderID, batch)
batch = batch[:0]
batchSizeBytes = 0
}
if ignores.ShouldIgnore(intf.FileName()) {
file := intf.(protocol.FileInfo)
file.Invalidate(f.shortID)
batch = append(batch, file)
batchSizeBytes += file.ProtoSize()
l.Debugln(f, "Handling ignored file", file)
return true
}
curFile, ok := f.model.CurrentFolderFile(f.folderID, intf.FileName())
if !ok {
if intf.IsDeleted() {
panic("Should never get a deleted file as needed when we don't have it")
}
return true
}
file := intf.(protocol.FileInfo)
if !file.IsEquivalent(curFile, f.IgnorePerms, false) {
return true
}
file.Version = file.Version.Merge(curFile.Version)
batch = append(batch, file)
batchSizeBytes += file.ProtoSize()
l.Debugln(f, "Merging versions of identical file", file)
return true
})
if len(batch) > 0 {
f.model.updateLocalsFromPulling(f.folderID, batch)
}
}

View File

@ -96,8 +96,7 @@ type sendReceiveFolder struct {
versioner versioner.Versioner versioner versioner.Versioner
pause time.Duration pause time.Duration
queue *jobQueue queue *jobQueue
pullScheduled chan struct{}
errors map[string]string // path -> error string errors map[string]string // path -> error string
errorsMut sync.Mutex errorsMut sync.Mutex
@ -110,8 +109,7 @@ func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver vers
fs: fs, fs: fs,
versioner: ver, versioner: ver,
queue: newJobQueue(), queue: newJobQueue(),
pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
errorsMut: sync.NewMutex(), errorsMut: sync.NewMutex(),
} }
@ -132,13 +130,6 @@ func (f *sendReceiveFolder) configureCopiersAndPullers() {
f.pause = f.basePause() f.pause = f.basePause()
} }
// Helper function to check whether either the ignorePerm flag has been
// set on the local host or the FlagNoPermBits has been set on the file/dir
// which is being pulled.
func (f *sendReceiveFolder) ignorePermissions(file protocol.FileInfo) bool {
return f.IgnorePerms || file.NoPermissions
}
// Serve will run scans and pulls. It will return when Stop()ed or on a // Serve will run scans and pulls. It will return when Stop()ed or on a
// critical error. // critical error.
func (f *sendReceiveFolder) Serve() { func (f *sendReceiveFolder) Serve() {
@ -371,14 +362,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
// Regular files to pull goes into the file queue, everything else // Regular files to pull goes into the file queue, everything else
// (directories, symlinks and deletes) goes into the "process directly" // (directories, symlinks and deletes) goes into the "process directly"
// pile. // pile.
folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
// Don't iterate over invalid/ignored files unless ignores have changed
iterate := folderFiles.WithNeed
if ignoresChanged {
iterate = folderFiles.WithNeedOrInvalid
}
iterate(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
if f.IgnoreDelete && intf.IsDeleted() { if f.IgnoreDelete && intf.IsDeleted() {
l.Debugln(f, "ignore file deletion (config)", intf.FileName()) l.Debugln(f, "ignore file deletion (config)", intf.FileName())
return true return true
@ -388,7 +372,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
switch { switch {
case ignores.ShouldIgnore(file.Name): case ignores.ShouldIgnore(file.Name):
file.Invalidate(f.model.id.Short()) file.Invalidate(f.shortID)
l.Debugln(f, "Handling ignored file", file) l.Debugln(f, "Handling ignored file", file)
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate} dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
@ -416,7 +400,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
l.Debugln(f, "Needed file is unavailable", file) l.Debugln(f, "Needed file is unavailable", file)
case runtime.GOOS == "windows" && file.IsSymlink(): case runtime.GOOS == "windows" && file.IsSymlink():
file.Invalidate(f.model.id.Short()) file.Invalidate(f.shortID)
l.Debugln(f, "Invalidating symlink (unsupported)", file.Name) l.Debugln(f, "Invalidating symlink (unsupported)", file.Name)
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate} dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
@ -562,7 +546,7 @@ nextFile:
// we can just do a rename instead. // we can just do a rename instead.
key := string(fi.Blocks[0].Hash) key := string(fi.Blocks[0].Hash)
for i, candidate := range buckets[key] { for i, candidate := range buckets[key] {
if blocksEqual(candidate.Blocks, fi.Blocks) { if protocol.BlocksEqual(candidate.Blocks, fi.Blocks) {
// Remove the candidate from the bucket // Remove the candidate from the bucket
lidx := len(buckets[key]) - 1 lidx := len(buckets[key]) - 1
buckets[key][i] = buckets[key][lidx] buckets[key][i] = buckets[key][lidx]
@ -617,21 +601,6 @@ nextFile:
return changed return changed
} }
// blocksEqual returns whether two slices of blocks are exactly the same hash
// and index pair wise.
func blocksEqual(src, tgt []protocol.BlockInfo) bool {
if len(tgt) != len(src) {
return false
}
for i, sblk := range src {
if !bytes.Equal(sblk.Hash, tgt[i].Hash) {
return false
}
}
return true
}
// handleDir creates or updates the given directory // handleDir creates or updates the given directory
func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) { func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
// Used in the defer closure below, updated by the function body. Take // Used in the defer closure below, updated by the function body. Take
@ -656,7 +625,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<
}() }()
mode := fs.FileMode(file.Permissions & 0777) mode := fs.FileMode(file.Permissions & 0777)
if f.ignorePermissions(file) { if f.IgnorePerms || file.NoPermissions {
mode = 0777 mode = 0777
} }
@ -685,7 +654,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<
// not MkdirAll because the parent should already exist. // not MkdirAll because the parent should already exist.
mkdir := func(path string) error { mkdir := func(path string) error {
err = f.fs.Mkdir(path, mode) err = f.fs.Mkdir(path, mode)
if err != nil || f.ignorePermissions(file) { if err != nil || f.IgnorePerms || file.NoPermissions {
return err return err
} }
@ -716,7 +685,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<
// The directory already exists, so we just correct the mode bits. (We // The directory already exists, so we just correct the mode bits. (We
// don't handle modification times on directories, because that sucks...) // don't handle modification times on directories, because that sucks...)
// It's OK to change mode bits on stuff within non-writable directories. // It's OK to change mode bits on stuff within non-writable directories.
if f.ignorePermissions(file) { if f.IgnorePerms || file.NoPermissions {
dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir} dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
} else if err := f.fs.Chmod(file.Name, mode|(fs.FileMode(info.Mode())&retainBits)); err == nil { } else if err := f.fs.Chmod(file.Name, mode|(fs.FileMode(info.Mode())&retainBits)); err == nil {
dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir} dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
@ -1107,7 +1076,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c
updated: time.Now(), updated: time.Now(),
available: reused, available: reused,
availableUpdated: time.Now(), availableUpdated: time.Now(),
ignorePerms: f.ignorePermissions(file), ignorePerms: f.IgnorePerms || file.NoPermissions,
hasCurFile: hasCurFile, hasCurFile: hasCurFile,
curFile: curFile, curFile: curFile,
mut: sync.NewRWMutex(), mut: sync.NewRWMutex(),
@ -1167,7 +1136,7 @@ func populateOffsets(blocks []protocol.BlockInfo) {
// shortcutFile sets file mode and modification time, when that's the only // shortcutFile sets file mode and modification time, when that's the only
// thing that has changed. // thing that has changed.
func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo) error { func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo) error {
if !f.ignorePermissions(file) { if !f.IgnorePerms && !file.NoPermissions {
if err := f.fs.Chmod(file.Name, fs.FileMode(file.Permissions&0777)); err != nil { if err := f.fs.Chmod(file.Name, fs.FileMode(file.Permissions&0777)); err != nil {
f.newError("shortcut chmod", file.Name, err) f.newError("shortcut chmod", file.Name, err)
return err return err
@ -1445,7 +1414,7 @@ func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *
func (f *sendReceiveFolder) performFinish(ignores *ignore.Matcher, state *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error { func (f *sendReceiveFolder) performFinish(ignores *ignore.Matcher, state *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
// Set the correct permission bits on the new file // Set the correct permission bits on the new file
if !f.ignorePermissions(state.file) { if !f.IgnorePerms && !state.file.NoPermissions {
if err := f.fs.Chmod(state.tempName, fs.FileMode(state.file.Permissions&0777)); err != nil { if err := f.fs.Chmod(state.tempName, fs.FileMode(state.file.Permissions&0777)); err != nil {
return err return err
} }
@ -1490,7 +1459,7 @@ func (f *sendReceiveFolder) performFinish(ignores *ignore.Matcher, state *shared
case stat.IsDir(): case stat.IsDir():
// Dirs only have perm, no modetime/size // Dirs only have perm, no modetime/size
if !f.ignorePermissions(state.curFile) && state.curFile.HasPermissionBits() && !scanner.PermsEqual(state.curFile.Permissions, curMode) { if !f.IgnorePerms && !state.curFile.NoPermissions && state.curFile.HasPermissionBits() && !protocol.PermsEqual(state.curFile.Permissions, curMode) {
l.Debugln("file permission modified but not rescanned; not finishing:", state.curFile.Name) l.Debugln("file permission modified but not rescanned; not finishing:", state.curFile.Name)
changed = true changed = true
} }
@ -1722,7 +1691,7 @@ func (f *sendReceiveFolder) inConflict(current, replacement protocol.Vector) boo
// Obvious case // Obvious case
return true return true
} }
if replacement.Counter(f.model.shortID) > current.Counter(f.model.shortID) { if replacement.Counter(f.shortID) > current.Counter(f.shortID) {
// The replacement file contains a higher version for ourselves than // The replacement file contains a higher version for ourselves than
// what we have. This isn't supposed to be possible, since it's only // what we have. This isn't supposed to be possible, since it's only
// we who can increment that counter. We take it as a sign that // we who can increment that counter. We take it as a sign that
@ -1825,11 +1794,6 @@ func (f *sendReceiveFolder) basePause() time.Duration {
return time.Duration(f.PullerPauseS) * time.Second return time.Duration(f.PullerPauseS) * time.Second
} }
func (f *sendReceiveFolder) IgnoresUpdated() {
f.folder.IgnoresUpdated()
f.SchedulePull()
}
// deleteDir attempts to delete a directory. It checks for files/dirs inside // deleteDir attempts to delete a directory. It checks for files/dirs inside
// the directory and removes them if possible or returns an error if it fails // the directory and removes them if possible or returns an error if it fails
func (f *sendReceiveFolder) deleteDir(dir string, ignores *ignore.Matcher, scanChan chan<- string) error { func (f *sendReceiveFolder) deleteDir(dir string, ignores *ignore.Matcher, scanChan chan<- string) error {

View File

@ -10,6 +10,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"runtime"
"time" "time"
"github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/rand"
@ -122,6 +123,74 @@ func (f FileInfo) WinsConflict(other FileInfo) bool {
return f.Version.Compare(other.Version) == ConcurrentGreater return f.Version.Compare(other.Version) == ConcurrentGreater
} }
func (f FileInfo) IsEmpty() bool {
return f.Version.Counters == nil
}
// IsEquivalent checks that the two file infos represent the same actual file content,
// i.e. it does purposely not check only selected (see below) struct members.
// Permissions (config) and blocks (scanning) can be excluded from the comparison.
// Any file info is not "equivalent", if it has different
// - type
// - deleted flag
// - invalid flag
// - permissions, unless they are ignored
// A file is not "equivalent", if it has different
// - modification time
// - size
// - blocks, unless there are no blocks to compare (scanning)
// A symlink is not "equivalent", if it has different
// - target
// A directory does not have anything specific to check.
func (f FileInfo) IsEquivalent(other FileInfo, ignorePerms bool, ignoreBlocks bool) bool {
if f.Name != other.Name || f.Type != other.Type || f.Deleted != other.Deleted || f.Invalid != other.Invalid {
return false
}
if !ignorePerms && !f.NoPermissions && !other.NoPermissions && !PermsEqual(f.Permissions, other.Permissions) {
return false
}
switch f.Type {
case FileInfoTypeFile:
return f.Size == other.Size && f.ModTime().Equal(other.ModTime()) && (ignoreBlocks || BlocksEqual(f.Blocks, other.Blocks))
case FileInfoTypeSymlink:
return f.SymlinkTarget == other.SymlinkTarget
case FileInfoTypeDirectory:
return true
}
return false
}
func PermsEqual(a, b uint32) bool {
switch runtime.GOOS {
case "windows":
// There is only writeable and read only, represented for user, group
// and other equally. We only compare against user.
return a&0600 == b&0600
default:
// All bits count
return a&0777 == b&0777
}
}
// BlocksEqual returns whether two slices of blocks are exactly the same hash
// and index pair wise.
func BlocksEqual(a, b []BlockInfo) bool {
if len(b) != len(a) {
return false
}
for i, sblk := range a {
if !bytes.Equal(sblk.Hash, b[i].Hash) {
return false
}
}
return true
}
func (f *FileInfo) Invalidate(invalidatedBy ShortID) { func (f *FileInfo) Invalidate(invalidatedBy ShortID) {
f.Invalid = true f.Invalid = true
f.ModifiedBy = invalidatedBy f.ModifiedBy = invalidatedBy

View File

@ -109,6 +109,18 @@ func (v Vector) Counter(id ShortID) uint64 {
return 0 return 0
} }
// DropOthers removes all counters, keeping only the one with given id. If there
// is no such counter, an empty Vector is returned.
func (v Vector) DropOthers(id ShortID) Vector {
for i, c := range v.Counters {
if c.ID == id {
v.Counters = v.Counters[i : i+1]
return v
}
}
return Vector{}
}
// Ordering represents the relationship between two Vectors. // Ordering represents the relationship between two Vectors.
type Ordering int type Ordering int

View File

@ -285,25 +285,7 @@ func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileIn
curMode |= 0111 curMode |= 0111
} }
// A file is "unchanged", if it
// - exists
// - has the same permissions as previously, unless we are ignoring permissions
// - was not marked deleted (since it apparently exists now)
// - had the same modification time as it has now
// - was not a directory previously (since it's a file now)
// - was not a symlink (since it's a file now)
// - was not invalid (since it looks valid now)
// - has the same size as previously
cf, ok := w.CurrentFiler.CurrentFile(relPath) cf, ok := w.CurrentFiler.CurrentFile(relPath)
permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode)
if ok && permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() &&
!cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() {
return nil
}
if ok {
l.Debugln("rescan:", cf, info.ModTime().Unix(), info.Mode()&fs.ModePerm)
}
f := protocol.FileInfo{ f := protocol.FileInfo{
Name: relPath, Name: relPath,
@ -316,6 +298,21 @@ func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileIn
ModifiedBy: w.ShortID, ModifiedBy: w.ShortID,
Size: info.Size(), Size: info.Size(),
} }
if ok {
if cf.IsEquivalent(f, w.IgnorePerms, true) {
return nil
}
if cf.Invalid {
// We do not want to override the global version with the file we
// currently have. Keeping only our local counter makes sure we are in
// conflict with any other existing versions, which will be resolved by
// the normal pulling mechanisms.
f.Version.DropOthers(w.ShortID)
}
l.Debugln("rescan:", cf, info.ModTime().Unix(), info.Mode()&fs.ModePerm)
}
l.Debugln("to hash:", relPath, f) l.Debugln("to hash:", relPath, f)
select { select {
@ -328,18 +325,7 @@ func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileIn
} }
func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error { func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error {
// A directory is "unchanged", if it
// - exists
// - has the same permissions as previously, unless we are ignoring permissions
// - was not marked deleted (since it apparently exists now)
// - was a directory previously (not a file or something else)
// - was not a symlink (since it's a directory now)
// - was not invalid (since it looks valid now)
cf, ok := w.CurrentFiler.CurrentFile(relPath) cf, ok := w.CurrentFiler.CurrentFile(relPath)
permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode()))
if ok && permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() {
return nil
}
f := protocol.FileInfo{ f := protocol.FileInfo{
Name: relPath, Name: relPath,
@ -351,6 +337,20 @@ func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo,
ModifiedNs: int32(info.ModTime().Nanosecond()), ModifiedNs: int32(info.ModTime().Nanosecond()),
ModifiedBy: w.ShortID, ModifiedBy: w.ShortID,
} }
if ok {
if cf.IsEquivalent(f, w.IgnorePerms, true) {
return nil
}
if cf.Invalid {
// We do not want to override the global version with the file we
// currently have. Keeping only our local counter makes sure we are in
// conflict with any other existing versions, which will be resolved by
// the normal pulling mechanisms.
f.Version.DropOthers(w.ShortID)
}
}
l.Debugln("dir:", relPath, f) l.Debugln("dir:", relPath, f)
select { select {
@ -382,16 +382,7 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan pro
return nil return nil
} }
// A symlink is "unchanged", if
// - it exists
// - it wasn't deleted (because it isn't now)
// - it was a symlink
// - it wasn't invalid
// - the target was the same
cf, ok := w.CurrentFiler.CurrentFile(relPath) cf, ok := w.CurrentFiler.CurrentFile(relPath)
if ok && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target {
return nil
}
f := protocol.FileInfo{ f := protocol.FileInfo{
Name: relPath, Name: relPath,
@ -401,6 +392,19 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan pro
SymlinkTarget: target, SymlinkTarget: target,
} }
if ok {
if cf.IsEquivalent(f, w.IgnorePerms, true) {
return nil
}
if cf.Invalid {
// We do not want to override the global version with the file we
// currently have. Keeping only our local counter makes sure we are in
// conflict with any other existing versions, which will be resolved by
// the normal pulling mechanisms.
f.Version.DropOthers(w.ShortID)
}
}
l.Debugln("symlink changedb:", relPath, f) l.Debugln("symlink changedb:", relPath, f)
select { select {
@ -475,18 +479,6 @@ func (w *walker) normalizePath(path string, info fs.FileInfo) (normPath string,
return normPath, false return normPath, false
} }
func PermsEqual(a, b uint32) bool {
switch runtime.GOOS {
case "windows":
// There is only writeable and read only, represented for user, group
// and other equally. We only compare against user.
return a&0600 == b&0600
default:
// All bits count
return a&0777 == b&0777
}
}
// A byteCounter gets bytes added to it via Update() and then provides the // A byteCounter gets bytes added to it via Update() and then provides the
// Total() and one minute moving average Rate() in bytes per second. // Total() and one minute moving average Rate() in bytes per second.
type byteCounter struct { type byteCounter struct {