mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-05 08:02:13 +00:00
This commit is contained in:
parent
accaf23aa3
commit
df48276300
@ -885,8 +885,14 @@ func (f *fakeFile) Truncate(size int64) error {
|
|||||||
defer f.mut.Unlock()
|
defer f.mut.Unlock()
|
||||||
|
|
||||||
if f.content != nil {
|
if f.content != nil {
|
||||||
|
if int64(cap(f.content)) < size {
|
||||||
|
c := make([]byte, size)
|
||||||
|
copy(c[:len(f.content)], f.content)
|
||||||
|
f.content = c
|
||||||
|
} else {
|
||||||
f.content = f.content[:int(size)]
|
f.content = f.content[:int(size)]
|
||||||
}
|
}
|
||||||
|
}
|
||||||
f.rng = nil
|
f.rng = nil
|
||||||
f.size = size
|
f.size = size
|
||||||
if f.offset > size {
|
if f.offset > size {
|
||||||
|
@ -29,6 +29,7 @@ func TestRecvOnlyRevertDeletes(t *testing.T) {
|
|||||||
defer wcfgCancel()
|
defer wcfgCancel()
|
||||||
ffs := f.Filesystem()
|
ffs := f.Filesystem()
|
||||||
defer cleanupModel(m)
|
defer cleanupModel(m)
|
||||||
|
addFakeConn(m, device1, f.ID)
|
||||||
|
|
||||||
// Create some test data
|
// Create some test data
|
||||||
|
|
||||||
@ -110,6 +111,7 @@ func TestRecvOnlyRevertNeeds(t *testing.T) {
|
|||||||
defer wcfgCancel()
|
defer wcfgCancel()
|
||||||
ffs := f.Filesystem()
|
ffs := f.Filesystem()
|
||||||
defer cleanupModel(m)
|
defer cleanupModel(m)
|
||||||
|
addFakeConn(m, device1, f.ID)
|
||||||
|
|
||||||
// Create some test data
|
// Create some test data
|
||||||
|
|
||||||
@ -199,6 +201,7 @@ func TestRecvOnlyUndoChanges(t *testing.T) {
|
|||||||
defer wcfgCancel()
|
defer wcfgCancel()
|
||||||
ffs := f.Filesystem()
|
ffs := f.Filesystem()
|
||||||
defer cleanupModel(m)
|
defer cleanupModel(m)
|
||||||
|
addFakeConn(m, device1, f.ID)
|
||||||
|
|
||||||
// Create some test data
|
// Create some test data
|
||||||
|
|
||||||
@ -268,6 +271,7 @@ func TestRecvOnlyDeletedRemoteDrop(t *testing.T) {
|
|||||||
defer wcfgCancel()
|
defer wcfgCancel()
|
||||||
ffs := f.Filesystem()
|
ffs := f.Filesystem()
|
||||||
defer cleanupModel(m)
|
defer cleanupModel(m)
|
||||||
|
addFakeConn(m, device1, f.ID)
|
||||||
|
|
||||||
// Create some test data
|
// Create some test data
|
||||||
|
|
||||||
@ -332,6 +336,7 @@ func TestRecvOnlyRemoteUndoChanges(t *testing.T) {
|
|||||||
defer wcfgCancel()
|
defer wcfgCancel()
|
||||||
ffs := f.Filesystem()
|
ffs := f.Filesystem()
|
||||||
defer cleanupModel(m)
|
defer cleanupModel(m)
|
||||||
|
addFakeConn(m, device1, f.ID)
|
||||||
|
|
||||||
// Create some test data
|
// Create some test data
|
||||||
|
|
||||||
|
@ -1280,6 +1280,7 @@ func TestPullSymlinkOverExistingWindows(t *testing.T) {
|
|||||||
|
|
||||||
m, f, wcfgCancel := setupSendReceiveFolder(t)
|
m, f, wcfgCancel := setupSendReceiveFolder(t)
|
||||||
defer cleanupSRFolder(f, m, wcfgCancel)
|
defer cleanupSRFolder(f, m, wcfgCancel)
|
||||||
|
addFakeConn(m, device1, f.ID)
|
||||||
|
|
||||||
name := "foo"
|
name := "foo"
|
||||||
if fd, err := f.mtimefs.Create(name); err != nil {
|
if fd, err := f.mtimefs.Create(name); err != nil {
|
||||||
|
549
lib/model/indexhandler.go
Normal file
549
lib/model/indexhandler.go
Normal file
@ -0,0 +1,549 @@
|
|||||||
|
// Copyright (C) 2020 The Syncthing Authors.
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||||
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/thejerf/suture/v4"
|
||||||
|
|
||||||
|
"github.com/syncthing/syncthing/lib/config"
|
||||||
|
"github.com/syncthing/syncthing/lib/db"
|
||||||
|
"github.com/syncthing/syncthing/lib/events"
|
||||||
|
"github.com/syncthing/syncthing/lib/protocol"
|
||||||
|
"github.com/syncthing/syncthing/lib/svcutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type indexHandler struct {
|
||||||
|
conn protocol.Connection
|
||||||
|
downloads *deviceDownloadState
|
||||||
|
folder string
|
||||||
|
folderIsReceiveEncrypted bool
|
||||||
|
prevSequence int64
|
||||||
|
evLogger events.Logger
|
||||||
|
token suture.ServiceToken
|
||||||
|
|
||||||
|
cond *sync.Cond
|
||||||
|
paused bool
|
||||||
|
fset *db.FileSet
|
||||||
|
runner service
|
||||||
|
}
|
||||||
|
|
||||||
|
func newIndexHandler(conn protocol.Connection, downloads *deviceDownloadState, folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo, evLogger events.Logger) *indexHandler {
|
||||||
|
myIndexID := fset.IndexID(protocol.LocalDeviceID)
|
||||||
|
mySequence := fset.Sequence(protocol.LocalDeviceID)
|
||||||
|
var startSequence int64
|
||||||
|
|
||||||
|
// This is the other side's description of what it knows
|
||||||
|
// about us. Lets check to see if we can start sending index
|
||||||
|
// updates directly or need to send the index from start...
|
||||||
|
|
||||||
|
if startInfo.local.IndexID == myIndexID {
|
||||||
|
// They say they've seen our index ID before, so we can
|
||||||
|
// send a delta update only.
|
||||||
|
|
||||||
|
if startInfo.local.MaxSequence > mySequence {
|
||||||
|
// Safety check. They claim to have more or newer
|
||||||
|
// index data than we have - either we have lost
|
||||||
|
// index data, or reset the index without resetting
|
||||||
|
// the IndexID, or something else weird has
|
||||||
|
// happened. We send a full index to reset the
|
||||||
|
// situation.
|
||||||
|
l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", conn.ID().Short(), folder.Description())
|
||||||
|
startSequence = 0
|
||||||
|
} else {
|
||||||
|
l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", conn.ID().Short(), folder.Description(), startInfo.local.MaxSequence)
|
||||||
|
startSequence = startInfo.local.MaxSequence
|
||||||
|
}
|
||||||
|
} else if startInfo.local.IndexID != 0 {
|
||||||
|
// They say they've seen an index ID from us, but it's
|
||||||
|
// not the right one. Either they are confused or we
|
||||||
|
// must have reset our database since last talking to
|
||||||
|
// them. We'll start with a full index transfer.
|
||||||
|
l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", conn.ID().Short(), folder.Description(), startInfo.local.IndexID, myIndexID)
|
||||||
|
startSequence = 0
|
||||||
|
} else {
|
||||||
|
l.Debugf("Device %v folder %s has no index ID for us", conn.ID().Short(), folder.Description())
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is the other side's description of themselves. We
|
||||||
|
// check to see that it matches the IndexID we have on file,
|
||||||
|
// otherwise we drop our old index data and expect to get a
|
||||||
|
// completely new set.
|
||||||
|
|
||||||
|
theirIndexID := fset.IndexID(conn.ID())
|
||||||
|
if startInfo.remote.IndexID == 0 {
|
||||||
|
// They're not announcing an index ID. This means they
|
||||||
|
// do not support delta indexes and we should clear any
|
||||||
|
// information we have from them before accepting their
|
||||||
|
// index, which will presumably be a full index.
|
||||||
|
l.Debugf("Device %v folder %s does not announce an index ID", conn.ID().Short(), folder.Description())
|
||||||
|
fset.Drop(conn.ID())
|
||||||
|
} else if startInfo.remote.IndexID != theirIndexID {
|
||||||
|
// The index ID we have on file is not what they're
|
||||||
|
// announcing. They must have reset their database and
|
||||||
|
// will probably send us a full index. We drop any
|
||||||
|
// information we have and remember this new index ID
|
||||||
|
// instead.
|
||||||
|
l.Infof("Device %v folder %s has a new index ID (%v)", conn.ID().Short(), folder.Description(), startInfo.remote.IndexID)
|
||||||
|
fset.Drop(conn.ID())
|
||||||
|
fset.SetIndexID(conn.ID(), startInfo.remote.IndexID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &indexHandler{
|
||||||
|
conn: conn,
|
||||||
|
downloads: downloads,
|
||||||
|
folder: folder.ID,
|
||||||
|
folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
|
||||||
|
prevSequence: startSequence,
|
||||||
|
evLogger: evLogger,
|
||||||
|
|
||||||
|
fset: fset,
|
||||||
|
runner: runner,
|
||||||
|
cond: sync.NewCond(new(sync.Mutex)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *indexHandler) Serve(ctx context.Context) (err error) {
|
||||||
|
l.Debugf("Starting index handler for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence)
|
||||||
|
defer func() {
|
||||||
|
err = svcutil.NoRestartErr(err)
|
||||||
|
l.Debugf("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// We need to send one index, regardless of whether there is something to send or not
|
||||||
|
s.cond.L.Lock()
|
||||||
|
for s.paused {
|
||||||
|
s.cond.Wait()
|
||||||
|
}
|
||||||
|
fset := s.fset
|
||||||
|
s.cond.L.Unlock()
|
||||||
|
err = s.sendIndexTo(ctx, fset)
|
||||||
|
|
||||||
|
// Subscribe to LocalIndexUpdated (we have new information to send) and
|
||||||
|
// DeviceDisconnected (it might be us who disconnected, so we should
|
||||||
|
// exit).
|
||||||
|
sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
evChan := sub.C()
|
||||||
|
ticker := time.NewTicker(time.Minute)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for err == nil {
|
||||||
|
s.cond.L.Lock()
|
||||||
|
for s.paused {
|
||||||
|
s.cond.Wait()
|
||||||
|
}
|
||||||
|
fset := s.fset
|
||||||
|
s.cond.L.Unlock()
|
||||||
|
|
||||||
|
// While we have sent a sequence at least equal to the one
|
||||||
|
// currently in the database, wait for the local index to update. The
|
||||||
|
// local index may update for other folders than the one we are
|
||||||
|
// sending for.
|
||||||
|
if fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-evChan:
|
||||||
|
case <-ticker.C:
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.sendIndexTo(ctx, fset)
|
||||||
|
|
||||||
|
// Wait a short amount of time before entering the next loop. If there
|
||||||
|
// are continuous changes happening to the local index, this gives us
|
||||||
|
// time to batch them up a little.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-time.After(250 * time.Millisecond):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *indexHandler) resume(fset *db.FileSet, runner service) {
|
||||||
|
s.cond.L.Lock()
|
||||||
|
if !s.paused {
|
||||||
|
s.evLogger.Log(events.Failure, "index handler got resumed while not paused")
|
||||||
|
}
|
||||||
|
s.paused = false
|
||||||
|
s.fset = fset
|
||||||
|
s.runner = runner
|
||||||
|
s.cond.L.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *indexHandler) pause() {
|
||||||
|
s.cond.L.Lock()
|
||||||
|
if s.paused {
|
||||||
|
s.evLogger.Log(events.Failure, "index handler got paused while already paused")
|
||||||
|
}
|
||||||
|
s.paused = true
|
||||||
|
s.fset = nil
|
||||||
|
s.runner = nil
|
||||||
|
s.cond.L.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendIndexTo sends file infos with a sequence number higher than prevSequence and
|
||||||
|
// returns the highest sent sequence number.
|
||||||
|
func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error {
|
||||||
|
initial := s.prevSequence == 0
|
||||||
|
batch := db.NewFileInfoBatch(nil)
|
||||||
|
batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
|
||||||
|
l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
|
||||||
|
if initial {
|
||||||
|
initial = false
|
||||||
|
return s.conn.Index(ctx, s.folder, fs)
|
||||||
|
}
|
||||||
|
return s.conn.IndexUpdate(ctx, s.folder, fs)
|
||||||
|
})
|
||||||
|
|
||||||
|
var err error
|
||||||
|
var f protocol.FileInfo
|
||||||
|
snap, err := fset.Snapshot()
|
||||||
|
if err != nil {
|
||||||
|
return svcutil.AsFatalErr(err, svcutil.ExitError)
|
||||||
|
}
|
||||||
|
defer snap.Release()
|
||||||
|
previousWasDelete := false
|
||||||
|
snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool {
|
||||||
|
// This is to make sure that renames (which is an add followed by a delete) land in the same batch.
|
||||||
|
// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
|
||||||
|
// the batch ends with a non-delete, or that the last item in the batch is already a delete
|
||||||
|
if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
|
||||||
|
if err = batch.Flush(); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if shouldDebug() {
|
||||||
|
if fi.SequenceNo() < s.prevSequence+1 {
|
||||||
|
panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
|
||||||
|
l.Warnln("Non-increasing sequence detected: Checking and repairing the db...")
|
||||||
|
// Abort this round of index sending - the next one will pick
|
||||||
|
// up from the last successful one with the repeaired db.
|
||||||
|
defer func() {
|
||||||
|
if fixed, dbErr := fset.RepairSequence(); dbErr != nil {
|
||||||
|
l.Warnln("Failed repairing sequence entries:", dbErr)
|
||||||
|
panic("Failed repairing sequence entries")
|
||||||
|
} else {
|
||||||
|
s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
|
||||||
|
l.Infof("Repaired %v sequence entries in database", fixed)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
f = fi.(protocol.FileInfo)
|
||||||
|
|
||||||
|
// If this is a folder receiving encrypted files only, we
|
||||||
|
// mustn't ever send locally changed file infos. Those aren't
|
||||||
|
// encrypted and thus would be a protocol error at the remote.
|
||||||
|
if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
f = prepareFileInfoForIndex(f)
|
||||||
|
|
||||||
|
previousWasDelete = f.IsDeleted()
|
||||||
|
|
||||||
|
batch.Append(f)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = batch.Flush()
|
||||||
|
|
||||||
|
// True if there was nothing to be sent
|
||||||
|
if f.Sequence == 0 {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.prevSequence = f.Sequence
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string) error {
|
||||||
|
deviceID := s.conn.ID()
|
||||||
|
|
||||||
|
s.cond.L.Lock()
|
||||||
|
paused := s.paused
|
||||||
|
fset := s.fset
|
||||||
|
runner := s.runner
|
||||||
|
s.cond.L.Unlock()
|
||||||
|
|
||||||
|
if paused {
|
||||||
|
l.Infof("%v for paused folder %q", op, s.folder)
|
||||||
|
return fmt.Errorf("%v: %w", s.folder, ErrFolderPaused)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer runner.SchedulePull()
|
||||||
|
|
||||||
|
s.downloads.Update(s.folder, makeForgetUpdate(fs))
|
||||||
|
|
||||||
|
if !update {
|
||||||
|
fset.Drop(deviceID)
|
||||||
|
}
|
||||||
|
for i := range fs {
|
||||||
|
// The local attributes should never be transmitted over the wire.
|
||||||
|
// Make sure they look like they weren't.
|
||||||
|
fs[i].LocalFlags = 0
|
||||||
|
fs[i].VersionHash = nil
|
||||||
|
}
|
||||||
|
fset.Update(deviceID, fs)
|
||||||
|
|
||||||
|
seq := fset.Sequence(deviceID)
|
||||||
|
s.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{
|
||||||
|
"device": deviceID.String(),
|
||||||
|
"folder": s.folder,
|
||||||
|
"items": len(fs),
|
||||||
|
"sequence": seq,
|
||||||
|
"version": seq, // legacy for sequence
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo {
|
||||||
|
// Mark the file as invalid if any of the local bad stuff flags are set.
|
||||||
|
f.RawInvalid = f.IsInvalid()
|
||||||
|
// If the file is marked LocalReceive (i.e., changed locally on a
|
||||||
|
// receive only folder) we do not want it to ever become the
|
||||||
|
// globally best version, invalid or not.
|
||||||
|
if f.IsReceiveOnlyChanged() {
|
||||||
|
f.Version = protocol.Vector{}
|
||||||
|
}
|
||||||
|
// never sent externally
|
||||||
|
f.LocalFlags = 0
|
||||||
|
f.VersionHash = nil
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *indexHandler) String() string {
|
||||||
|
return fmt.Sprintf("indexHandler@%p for %s to %s at %s", s, s.folder, s.conn.ID().Short(), s.conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
type indexHandlerRegistry struct {
|
||||||
|
sup *suture.Supervisor
|
||||||
|
evLogger events.Logger
|
||||||
|
conn protocol.Connection
|
||||||
|
downloads *deviceDownloadState
|
||||||
|
indexHandlers map[string]*indexHandler
|
||||||
|
startInfos map[string]*clusterConfigDeviceInfo
|
||||||
|
folderStates map[string]*indexHandlerFolderState
|
||||||
|
mut sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type indexHandlerFolderState struct {
|
||||||
|
cfg config.FolderConfiguration
|
||||||
|
fset *db.FileSet
|
||||||
|
runner service
|
||||||
|
}
|
||||||
|
|
||||||
|
func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, closed chan struct{}, parentSup *suture.Supervisor, evLogger events.Logger) *indexHandlerRegistry {
|
||||||
|
r := &indexHandlerRegistry{
|
||||||
|
conn: conn,
|
||||||
|
downloads: downloads,
|
||||||
|
evLogger: evLogger,
|
||||||
|
indexHandlers: make(map[string]*indexHandler),
|
||||||
|
startInfos: make(map[string]*clusterConfigDeviceInfo),
|
||||||
|
folderStates: make(map[string]*indexHandlerFolderState),
|
||||||
|
mut: sync.Mutex{},
|
||||||
|
}
|
||||||
|
r.sup = suture.New(r.String(), svcutil.SpecWithDebugLogger(l))
|
||||||
|
ourToken := parentSup.Add(r.sup)
|
||||||
|
r.sup.Add(svcutil.AsService(func(ctx context.Context) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-closed:
|
||||||
|
parentSup.Remove(ourToken)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, fmt.Sprintf("%v/waitForClosed", r)))
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexHandlerRegistry) String() string {
|
||||||
|
return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.ID().Short())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexHandlerRegistry) GetSupervisor() *suture.Supervisor {
|
||||||
|
return r.sup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) {
|
||||||
|
if is, ok := r.indexHandlers[folder.ID]; ok {
|
||||||
|
r.sup.RemoveAndWait(is.token, 0)
|
||||||
|
delete(r.indexHandlers, folder.ID)
|
||||||
|
}
|
||||||
|
delete(r.startInfos, folder.ID)
|
||||||
|
|
||||||
|
is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger)
|
||||||
|
is.token = r.sup.Add(is)
|
||||||
|
r.indexHandlers[folder.ID] = is
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddIndexInfo starts an index handler for given folder, unless it is paused.
|
||||||
|
// If it is paused, the given startInfo is stored to start the sender once the
|
||||||
|
// folder is resumed.
|
||||||
|
// If an index handler is already running, it will be stopped first.
|
||||||
|
func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterConfigDeviceInfo) {
|
||||||
|
r.mut.Lock()
|
||||||
|
defer r.mut.Unlock()
|
||||||
|
|
||||||
|
if is, ok := r.indexHandlers[folder]; ok {
|
||||||
|
r.sup.RemoveAndWait(is.token, 0)
|
||||||
|
delete(r.indexHandlers, folder)
|
||||||
|
l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.ID().Short(), folder)
|
||||||
|
}
|
||||||
|
folderState, ok := r.folderStates[folder]
|
||||||
|
if !ok {
|
||||||
|
l.Debugf("Pending index handler for device %v and folder %v", r.conn.ID().Short(), folder)
|
||||||
|
r.startInfos[folder] = startInfo
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.startLocked(folderState.cfg, folderState.fset, folderState.runner, startInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove stops a running index handler or removes one pending to be started.
|
||||||
|
// It is a noop if the folder isn't known.
|
||||||
|
func (r *indexHandlerRegistry) Remove(folder string) {
|
||||||
|
r.mut.Lock()
|
||||||
|
defer r.mut.Unlock()
|
||||||
|
|
||||||
|
l.Debugf("Removing index handler for device %v and folder %v", r.conn.ID().Short(), folder)
|
||||||
|
if is, ok := r.indexHandlers[folder]; ok {
|
||||||
|
r.sup.RemoveAndWait(is.token, 0)
|
||||||
|
delete(r.indexHandlers, folder)
|
||||||
|
}
|
||||||
|
delete(r.startInfos, folder)
|
||||||
|
l.Debugf("Removed index handler for device %v and folder %v", r.conn.ID().Short(), folder)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveAllExcept stops all running index handlers and removes those pending to be started,
|
||||||
|
// except mentioned ones.
|
||||||
|
// It is a noop if the folder isn't known.
|
||||||
|
func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]struct{}) {
|
||||||
|
r.mut.Lock()
|
||||||
|
defer r.mut.Unlock()
|
||||||
|
|
||||||
|
for folder, is := range r.indexHandlers {
|
||||||
|
if _, ok := except[folder]; !ok {
|
||||||
|
r.sup.RemoveAndWait(is.token, 0)
|
||||||
|
delete(r.indexHandlers, folder)
|
||||||
|
l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.ID().Short(), folder)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for folder := range r.startInfos {
|
||||||
|
if _, ok := except[folder]; !ok {
|
||||||
|
delete(r.startInfos, folder)
|
||||||
|
l.Debugf("Removed pending index handler for device %v and folder %v (removeAllExcept)", r.conn.ID().Short(), folder)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterFolderState must be called whenever something about the folder
|
||||||
|
// changes. The exception being if the folder is removed entirely, then call
|
||||||
|
// Remove. The fset and runner arguments may be nil, if given folder is paused.
|
||||||
|
func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfiguration, fset *db.FileSet, runner service) {
|
||||||
|
if !folder.SharedWith(r.conn.ID()) {
|
||||||
|
r.Remove(folder.ID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.mut.Lock()
|
||||||
|
if folder.Paused {
|
||||||
|
r.folderPausedLocked(folder.ID)
|
||||||
|
} else {
|
||||||
|
r.folderStartedLocked(folder, fset, runner)
|
||||||
|
}
|
||||||
|
r.mut.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// folderPausedLocked stops a running index handler.
|
||||||
|
// It is a noop if the folder isn't known or has not been started yet.
|
||||||
|
func (r *indexHandlerRegistry) folderPausedLocked(folder string) {
|
||||||
|
l.Debugf("Pausing index handler for device %v and folder %v", r.conn.ID().Short(), folder)
|
||||||
|
delete(r.folderStates, folder)
|
||||||
|
if is, ok := r.indexHandlers[folder]; ok {
|
||||||
|
is.pause()
|
||||||
|
l.Debugf("Paused index handler for device %v and folder %v", r.conn.ID().Short(), folder)
|
||||||
|
} else {
|
||||||
|
l.Debugf("No index handler for device %v and folder %v to pause", r.conn.ID().Short(), folder)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// folderStartedLocked resumes an already running index handler or starts it, if it
|
||||||
|
// was added while paused.
|
||||||
|
// It is a noop if the folder isn't known.
|
||||||
|
func (r *indexHandlerRegistry) folderStartedLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service) {
|
||||||
|
r.folderStates[folder.ID] = &indexHandlerFolderState{
|
||||||
|
cfg: folder,
|
||||||
|
fset: fset,
|
||||||
|
runner: runner,
|
||||||
|
}
|
||||||
|
|
||||||
|
is, isOk := r.indexHandlers[folder.ID]
|
||||||
|
if info, ok := r.startInfos[folder.ID]; ok {
|
||||||
|
if isOk {
|
||||||
|
r.sup.RemoveAndWait(is.token, 0)
|
||||||
|
delete(r.indexHandlers, folder.ID)
|
||||||
|
l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.ID().Short(), folder.ID)
|
||||||
|
}
|
||||||
|
r.startLocked(folder, fset, runner, info)
|
||||||
|
delete(r.startInfos, folder.ID)
|
||||||
|
l.Debugf("Started index handler for device %v and folder %v in resume", r.conn.ID().Short(), folder.ID)
|
||||||
|
} else if isOk {
|
||||||
|
l.Debugf("Resuming index handler for device %v and folder %v", r.conn.ID().Short(), folder)
|
||||||
|
is.resume(fset, runner)
|
||||||
|
} else {
|
||||||
|
l.Debugf("Not resuming index handler for device %v and folder %v as none is paused and there is no start info", r.conn.ID().Short(), folder.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error {
|
||||||
|
r.mut.Lock()
|
||||||
|
defer r.mut.Unlock()
|
||||||
|
is, isOk := r.indexHandlers[folder]
|
||||||
|
if !isOk {
|
||||||
|
l.Infof("%v for nonexistent or paused folder %q", op, folder)
|
||||||
|
return ErrFolderMissing
|
||||||
|
}
|
||||||
|
return is.receive(fs, update, op)
|
||||||
|
}
|
||||||
|
|
||||||
|
// makeForgetUpdate takes an index update and constructs a download progress update
|
||||||
|
// causing to forget any progress for files which we've just been sent.
|
||||||
|
func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate {
|
||||||
|
updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files))
|
||||||
|
for _, file := range files {
|
||||||
|
if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
updates = append(updates, protocol.FileDownloadProgressUpdate{
|
||||||
|
Name: file.Name,
|
||||||
|
Version: file.Version,
|
||||||
|
UpdateType: protocol.FileDownloadProgressUpdateTypeForget,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return updates
|
||||||
|
}
|
@ -1,430 +0,0 @@
|
|||||||
// Copyright (C) 2020 The Syncthing Authors.
|
|
||||||
//
|
|
||||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
||||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
||||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
||||||
|
|
||||||
package model
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/thejerf/suture/v4"
|
|
||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/config"
|
|
||||||
"github.com/syncthing/syncthing/lib/db"
|
|
||||||
"github.com/syncthing/syncthing/lib/events"
|
|
||||||
"github.com/syncthing/syncthing/lib/protocol"
|
|
||||||
"github.com/syncthing/syncthing/lib/svcutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
type indexSender struct {
|
|
||||||
conn protocol.Connection
|
|
||||||
folder string
|
|
||||||
folderIsReceiveEncrypted bool
|
|
||||||
fset *db.FileSet
|
|
||||||
prevSequence int64
|
|
||||||
evLogger events.Logger
|
|
||||||
connClosed chan struct{}
|
|
||||||
done chan struct{}
|
|
||||||
token suture.ServiceToken
|
|
||||||
pauseChan chan struct{}
|
|
||||||
resumeChan chan *db.FileSet
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *indexSender) Serve(ctx context.Context) (err error) {
|
|
||||||
l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence)
|
|
||||||
defer func() {
|
|
||||||
close(s.done)
|
|
||||||
err = svcutil.NoRestartErr(err)
|
|
||||||
l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// We need to send one index, regardless of whether there is something to send or not
|
|
||||||
err = s.sendIndexTo(ctx)
|
|
||||||
|
|
||||||
// Subscribe to LocalIndexUpdated (we have new information to send) and
|
|
||||||
// DeviceDisconnected (it might be us who disconnected, so we should
|
|
||||||
// exit).
|
|
||||||
sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
paused := false
|
|
||||||
evChan := sub.C()
|
|
||||||
ticker := time.NewTicker(time.Minute)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for err == nil {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case <-s.connClosed:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// While we have sent a sequence at least equal to the one
|
|
||||||
// currently in the database, wait for the local index to update. The
|
|
||||||
// local index may update for other folders than the one we are
|
|
||||||
// sending for.
|
|
||||||
if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case <-s.connClosed:
|
|
||||||
return nil
|
|
||||||
case <-evChan:
|
|
||||||
case <-ticker.C:
|
|
||||||
case <-s.pauseChan:
|
|
||||||
paused = true
|
|
||||||
case s.fset = <-s.resumeChan:
|
|
||||||
paused = false
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !paused {
|
|
||||||
err = s.sendIndexTo(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait a short amount of time before entering the next loop. If there
|
|
||||||
// are continuous changes happening to the local index, this gives us
|
|
||||||
// time to batch them up a little.
|
|
||||||
time.Sleep(250 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *indexSender) resume(fset *db.FileSet) {
|
|
||||||
select {
|
|
||||||
case <-s.done:
|
|
||||||
case s.resumeChan <- fset:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *indexSender) pause() {
|
|
||||||
select {
|
|
||||||
case <-s.done:
|
|
||||||
case s.pauseChan <- struct{}{}:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendIndexTo sends file infos with a sequence number higher than prevSequence and
|
|
||||||
// returns the highest sent sequence number.
|
|
||||||
func (s *indexSender) sendIndexTo(ctx context.Context) error {
|
|
||||||
initial := s.prevSequence == 0
|
|
||||||
batch := db.NewFileInfoBatch(nil)
|
|
||||||
batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
|
|
||||||
l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
|
|
||||||
if initial {
|
|
||||||
initial = false
|
|
||||||
return s.conn.Index(ctx, s.folder, fs)
|
|
||||||
}
|
|
||||||
return s.conn.IndexUpdate(ctx, s.folder, fs)
|
|
||||||
})
|
|
||||||
|
|
||||||
var err error
|
|
||||||
var f protocol.FileInfo
|
|
||||||
snap, err := s.fset.Snapshot()
|
|
||||||
if err != nil {
|
|
||||||
return svcutil.AsFatalErr(err, svcutil.ExitError)
|
|
||||||
}
|
|
||||||
defer snap.Release()
|
|
||||||
previousWasDelete := false
|
|
||||||
snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool {
|
|
||||||
// This is to make sure that renames (which is an add followed by a delete) land in the same batch.
|
|
||||||
// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
|
|
||||||
// the batch ends with a non-delete, or that the last item in the batch is already a delete
|
|
||||||
if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
|
|
||||||
if err = batch.Flush(); err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if shouldDebug() {
|
|
||||||
if fi.SequenceNo() < s.prevSequence+1 {
|
|
||||||
panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
|
|
||||||
l.Warnln("Non-increasing sequence detected: Checking and repairing the db...")
|
|
||||||
// Abort this round of index sending - the next one will pick
|
|
||||||
// up from the last successful one with the repeaired db.
|
|
||||||
defer func() {
|
|
||||||
if fixed, dbErr := s.fset.RepairSequence(); dbErr != nil {
|
|
||||||
l.Warnln("Failed repairing sequence entries:", dbErr)
|
|
||||||
panic("Failed repairing sequence entries")
|
|
||||||
} else {
|
|
||||||
s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
|
|
||||||
l.Infof("Repaired %v sequence entries in database", fixed)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
f = fi.(protocol.FileInfo)
|
|
||||||
|
|
||||||
// If this is a folder receiving encrypted files only, we
|
|
||||||
// mustn't ever send locally changed file infos. Those aren't
|
|
||||||
// encrypted and thus would be a protocol error at the remote.
|
|
||||||
if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
f = prepareFileInfoForIndex(f)
|
|
||||||
|
|
||||||
previousWasDelete = f.IsDeleted()
|
|
||||||
|
|
||||||
batch.Append(f)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = batch.Flush()
|
|
||||||
|
|
||||||
// True if there was nothing to be sent
|
|
||||||
if f.Sequence == 0 {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.prevSequence = f.Sequence
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo {
|
|
||||||
// Mark the file as invalid if any of the local bad stuff flags are set.
|
|
||||||
f.RawInvalid = f.IsInvalid()
|
|
||||||
// If the file is marked LocalReceive (i.e., changed locally on a
|
|
||||||
// receive only folder) we do not want it to ever become the
|
|
||||||
// globally best version, invalid or not.
|
|
||||||
if f.IsReceiveOnlyChanged() {
|
|
||||||
f.Version = protocol.Vector{}
|
|
||||||
}
|
|
||||||
// never sent externally
|
|
||||||
f.LocalFlags = 0
|
|
||||||
f.VersionHash = nil
|
|
||||||
return f
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *indexSender) String() string {
|
|
||||||
return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.conn.ID(), s.conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
type indexSenderRegistry struct {
|
|
||||||
deviceID protocol.DeviceID
|
|
||||||
sup *suture.Supervisor
|
|
||||||
evLogger events.Logger
|
|
||||||
conn protocol.Connection
|
|
||||||
closed chan struct{}
|
|
||||||
indexSenders map[string]*indexSender
|
|
||||||
startInfos map[string]*indexSenderStartInfo
|
|
||||||
mut sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func newIndexSenderRegistry(conn protocol.Connection, closed chan struct{}, sup *suture.Supervisor, evLogger events.Logger) *indexSenderRegistry {
|
|
||||||
return &indexSenderRegistry{
|
|
||||||
deviceID: conn.ID(),
|
|
||||||
conn: conn,
|
|
||||||
closed: closed,
|
|
||||||
sup: sup,
|
|
||||||
evLogger: evLogger,
|
|
||||||
indexSenders: make(map[string]*indexSender),
|
|
||||||
startInfos: make(map[string]*indexSenderStartInfo),
|
|
||||||
mut: sync.Mutex{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// add starts an index sender for given folder.
|
|
||||||
// If an index sender is already running, it will be stopped first.
|
|
||||||
func (r *indexSenderRegistry) add(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) {
|
|
||||||
r.mut.Lock()
|
|
||||||
r.addLocked(folder, fset, startInfo)
|
|
||||||
l.Debugf("Started index sender for device %v and folder %v", r.deviceID.Short(), folder.ID)
|
|
||||||
r.mut.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) {
|
|
||||||
myIndexID := fset.IndexID(protocol.LocalDeviceID)
|
|
||||||
mySequence := fset.Sequence(protocol.LocalDeviceID)
|
|
||||||
var startSequence int64
|
|
||||||
|
|
||||||
// This is the other side's description of what it knows
|
|
||||||
// about us. Lets check to see if we can start sending index
|
|
||||||
// updates directly or need to send the index from start...
|
|
||||||
|
|
||||||
if startInfo.local.IndexID == myIndexID {
|
|
||||||
// They say they've seen our index ID before, so we can
|
|
||||||
// send a delta update only.
|
|
||||||
|
|
||||||
if startInfo.local.MaxSequence > mySequence {
|
|
||||||
// Safety check. They claim to have more or newer
|
|
||||||
// index data than we have - either we have lost
|
|
||||||
// index data, or reset the index without resetting
|
|
||||||
// the IndexID, or something else weird has
|
|
||||||
// happened. We send a full index to reset the
|
|
||||||
// situation.
|
|
||||||
l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", r.deviceID, folder.Description())
|
|
||||||
startSequence = 0
|
|
||||||
} else {
|
|
||||||
l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", r.deviceID, folder.Description(), startInfo.local.MaxSequence)
|
|
||||||
startSequence = startInfo.local.MaxSequence
|
|
||||||
}
|
|
||||||
} else if startInfo.local.IndexID != 0 {
|
|
||||||
// They say they've seen an index ID from us, but it's
|
|
||||||
// not the right one. Either they are confused or we
|
|
||||||
// must have reset our database since last talking to
|
|
||||||
// them. We'll start with a full index transfer.
|
|
||||||
l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", r.deviceID, folder.Description(), startInfo.local.IndexID, myIndexID)
|
|
||||||
startSequence = 0
|
|
||||||
} else {
|
|
||||||
l.Debugf("Device %v folder %s has no index ID for us", r.deviceID, folder.Description())
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is the other side's description of themselves. We
|
|
||||||
// check to see that it matches the IndexID we have on file,
|
|
||||||
// otherwise we drop our old index data and expect to get a
|
|
||||||
// completely new set.
|
|
||||||
|
|
||||||
theirIndexID := fset.IndexID(r.deviceID)
|
|
||||||
if startInfo.remote.IndexID == 0 {
|
|
||||||
// They're not announcing an index ID. This means they
|
|
||||||
// do not support delta indexes and we should clear any
|
|
||||||
// information we have from them before accepting their
|
|
||||||
// index, which will presumably be a full index.
|
|
||||||
l.Debugf("Device %v folder %s does not announce an index ID", r.deviceID, folder.Description())
|
|
||||||
fset.Drop(r.deviceID)
|
|
||||||
} else if startInfo.remote.IndexID != theirIndexID {
|
|
||||||
// The index ID we have on file is not what they're
|
|
||||||
// announcing. They must have reset their database and
|
|
||||||
// will probably send us a full index. We drop any
|
|
||||||
// information we have and remember this new index ID
|
|
||||||
// instead.
|
|
||||||
l.Infof("Device %v folder %s has a new index ID (%v)", r.deviceID, folder.Description(), startInfo.remote.IndexID)
|
|
||||||
fset.Drop(r.deviceID)
|
|
||||||
fset.SetIndexID(r.deviceID, startInfo.remote.IndexID)
|
|
||||||
}
|
|
||||||
|
|
||||||
if is, ok := r.indexSenders[folder.ID]; ok {
|
|
||||||
r.sup.RemoveAndWait(is.token, 0)
|
|
||||||
delete(r.indexSenders, folder.ID)
|
|
||||||
}
|
|
||||||
delete(r.startInfos, folder.ID)
|
|
||||||
|
|
||||||
is := &indexSender{
|
|
||||||
conn: r.conn,
|
|
||||||
connClosed: r.closed,
|
|
||||||
done: make(chan struct{}),
|
|
||||||
folder: folder.ID,
|
|
||||||
folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
|
|
||||||
fset: fset,
|
|
||||||
prevSequence: startSequence,
|
|
||||||
evLogger: r.evLogger,
|
|
||||||
pauseChan: make(chan struct{}),
|
|
||||||
resumeChan: make(chan *db.FileSet),
|
|
||||||
}
|
|
||||||
is.token = r.sup.Add(is)
|
|
||||||
r.indexSenders[folder.ID] = is
|
|
||||||
}
|
|
||||||
|
|
||||||
// addPending stores the given info to start an index sender once resume is called
|
|
||||||
// for this folder.
|
|
||||||
// If an index sender is already running, it will be stopped.
|
|
||||||
func (r *indexSenderRegistry) addPending(folder string, startInfo *indexSenderStartInfo) {
|
|
||||||
r.mut.Lock()
|
|
||||||
defer r.mut.Unlock()
|
|
||||||
|
|
||||||
if is, ok := r.indexSenders[folder]; ok {
|
|
||||||
r.sup.RemoveAndWait(is.token, 0)
|
|
||||||
delete(r.indexSenders, folder)
|
|
||||||
l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.deviceID.Short(), folder)
|
|
||||||
}
|
|
||||||
r.startInfos[folder] = startInfo
|
|
||||||
l.Debugf("Pending index sender for device %v and folder %v", r.deviceID.Short(), folder)
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove stops a running index sender or removes one pending to be started.
|
|
||||||
// It is a noop if the folder isn't known.
|
|
||||||
func (r *indexSenderRegistry) remove(folder string) {
|
|
||||||
r.mut.Lock()
|
|
||||||
defer r.mut.Unlock()
|
|
||||||
|
|
||||||
if is, ok := r.indexSenders[folder]; ok {
|
|
||||||
r.sup.RemoveAndWait(is.token, 0)
|
|
||||||
delete(r.indexSenders, folder)
|
|
||||||
}
|
|
||||||
delete(r.startInfos, folder)
|
|
||||||
l.Debugf("Removed index sender for device %v and folder %v", r.deviceID.Short(), folder)
|
|
||||||
}
|
|
||||||
|
|
||||||
// removeAllExcept stops all running index senders and removes those pending to be started,
|
|
||||||
// except mentioned ones.
|
|
||||||
// It is a noop if the folder isn't known.
|
|
||||||
func (r *indexSenderRegistry) removeAllExcept(except map[string]struct{}) {
|
|
||||||
r.mut.Lock()
|
|
||||||
defer r.mut.Unlock()
|
|
||||||
|
|
||||||
for folder, is := range r.indexSenders {
|
|
||||||
if _, ok := except[folder]; !ok {
|
|
||||||
r.sup.RemoveAndWait(is.token, 0)
|
|
||||||
delete(r.indexSenders, folder)
|
|
||||||
l.Debugf("Removed index sender for device %v and folder %v (removeAllExcept)", r.deviceID.Short(), folder)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for folder := range r.startInfos {
|
|
||||||
if _, ok := except[folder]; !ok {
|
|
||||||
delete(r.startInfos, folder)
|
|
||||||
l.Debugf("Removed pending index sender for device %v and folder %v (removeAllExcept)", r.deviceID.Short(), folder)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// pause stops a running index sender.
|
|
||||||
// It is a noop if the folder isn't known or has not been started yet.
|
|
||||||
func (r *indexSenderRegistry) pause(folder string) {
|
|
||||||
r.mut.Lock()
|
|
||||||
defer r.mut.Unlock()
|
|
||||||
|
|
||||||
if is, ok := r.indexSenders[folder]; ok {
|
|
||||||
is.pause()
|
|
||||||
l.Debugf("Paused index sender for device %v and folder %v", r.deviceID.Short(), folder)
|
|
||||||
} else {
|
|
||||||
l.Debugf("No index sender for device %v and folder %v to pause", r.deviceID.Short(), folder)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// resume unpauses an already running index sender or starts it, if it was added
|
|
||||||
// while paused.
|
|
||||||
// It is a noop if the folder isn't known.
|
|
||||||
func (r *indexSenderRegistry) resume(folder config.FolderConfiguration, fset *db.FileSet) {
|
|
||||||
r.mut.Lock()
|
|
||||||
defer r.mut.Unlock()
|
|
||||||
|
|
||||||
is, isOk := r.indexSenders[folder.ID]
|
|
||||||
if info, ok := r.startInfos[folder.ID]; ok {
|
|
||||||
if isOk {
|
|
||||||
r.sup.RemoveAndWait(is.token, 0)
|
|
||||||
delete(r.indexSenders, folder.ID)
|
|
||||||
l.Debugf("Removed index sender for device %v and folder %v in resume", r.deviceID.Short(), folder.ID)
|
|
||||||
}
|
|
||||||
r.addLocked(folder, fset, info)
|
|
||||||
delete(r.startInfos, folder.ID)
|
|
||||||
l.Debugf("Started index sender for device %v and folder %v in resume", r.deviceID.Short(), folder.ID)
|
|
||||||
} else if isOk {
|
|
||||||
is.resume(fset)
|
|
||||||
l.Debugf("Resume index sender for device %v and folder %v", r.deviceID.Short(), folder.ID)
|
|
||||||
} else {
|
|
||||||
l.Debugf("Not resuming index sender for device %v and folder %v as none is paused and there is no start info", r.deviceID.Short(), folder.ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type indexSenderStartInfo struct {
|
|
||||||
local, remote protocol.Device
|
|
||||||
}
|
|
@ -158,7 +158,7 @@ type model struct {
|
|||||||
helloMessages map[protocol.DeviceID]protocol.Hello
|
helloMessages map[protocol.DeviceID]protocol.Hello
|
||||||
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
|
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
|
||||||
remotePausedFolders map[protocol.DeviceID]map[string]struct{} // deviceID -> folders
|
remotePausedFolders map[protocol.DeviceID]map[string]struct{} // deviceID -> folders
|
||||||
indexSenders map[protocol.DeviceID]*indexSenderRegistry
|
indexHandlers map[protocol.DeviceID]*indexHandlerRegistry
|
||||||
|
|
||||||
// for testing only
|
// for testing only
|
||||||
foldersRunning int32
|
foldersRunning int32
|
||||||
@ -246,7 +246,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
|
|||||||
helloMessages: make(map[protocol.DeviceID]protocol.Hello),
|
helloMessages: make(map[protocol.DeviceID]protocol.Hello),
|
||||||
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
|
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
|
||||||
remotePausedFolders: make(map[protocol.DeviceID]map[string]struct{}),
|
remotePausedFolders: make(map[protocol.DeviceID]map[string]struct{}),
|
||||||
indexSenders: make(map[protocol.DeviceID]*indexSenderRegistry),
|
indexHandlers: make(map[protocol.DeviceID]*indexHandlerRegistry),
|
||||||
}
|
}
|
||||||
for devID := range cfg.Devices() {
|
for devID := range cfg.Devices() {
|
||||||
m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID)
|
m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID)
|
||||||
@ -485,8 +485,8 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.cleanupFolderLocked(cfg)
|
m.cleanupFolderLocked(cfg)
|
||||||
for _, r := range m.indexSenders {
|
for _, r := range m.indexHandlers {
|
||||||
r.remove(cfg.ID)
|
r.Remove(cfg.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.fmut.Unlock()
|
m.fmut.Unlock()
|
||||||
@ -558,21 +558,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
|
|||||||
|
|
||||||
// Care needs to be taken because we already hold fmut and the lock order
|
// Care needs to be taken because we already hold fmut and the lock order
|
||||||
// must be the same everywhere. As fmut is acquired first, this is fine.
|
// must be the same everywhere. As fmut is acquired first, this is fine.
|
||||||
// toDeviceIDs := to.DeviceIDs()
|
|
||||||
m.pmut.RLock()
|
m.pmut.RLock()
|
||||||
for _, id := range to.DeviceIDs() {
|
for _, indexRegistry := range m.indexHandlers {
|
||||||
indexSenders, ok := m.indexSenders[id]
|
indexRegistry.RegisterFolderState(to, fset, m.folderRunners[to.ID])
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// In case the folder was newly shared with us we already got a
|
|
||||||
// cluster config and wont necessarily get another soon - start
|
|
||||||
// sending indexes if connected.
|
|
||||||
if to.Paused {
|
|
||||||
indexSenders.pause(to.ID)
|
|
||||||
} else if !from.SharedWith(indexSenders.deviceID) || fsetNil || from.Paused {
|
|
||||||
indexSenders.resume(to, fset)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
m.pmut.RUnlock()
|
m.pmut.RUnlock()
|
||||||
|
|
||||||
@ -601,18 +589,19 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
|
|||||||
m.fmut.Lock()
|
m.fmut.Lock()
|
||||||
defer m.fmut.Unlock()
|
defer m.fmut.Unlock()
|
||||||
|
|
||||||
|
m.addAndStartFolderLocked(cfg, fset, cacheIgnoredFiles)
|
||||||
|
|
||||||
// Cluster configs might be received and processed before reaching this
|
// Cluster configs might be received and processed before reaching this
|
||||||
// point, i.e. before the folder is started. If that's the case, start
|
// point, i.e. before the folder is started. If that's the case, start
|
||||||
// index senders here.
|
// index senders here.
|
||||||
|
// Care needs to be taken because we already hold fmut and the lock order
|
||||||
|
// must be the same everywhere. As fmut is acquired first, this is fine.
|
||||||
m.pmut.RLock()
|
m.pmut.RLock()
|
||||||
for _, id := range cfg.DeviceIDs() {
|
for _, indexRegistry := range m.indexHandlers {
|
||||||
if is, ok := m.indexSenders[id]; ok {
|
indexRegistry.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID])
|
||||||
is.resume(cfg, fset)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
m.pmut.RUnlock()
|
m.pmut.RUnlock()
|
||||||
|
|
||||||
m.addAndStartFolderLocked(cfg, fset, cacheIgnoredFiles)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1151,46 +1140,23 @@ func (m *model) handleIndex(deviceID protocol.DeviceID, folder string, fs []prot
|
|||||||
return errors.Wrap(ErrFolderPaused, folder)
|
return errors.Wrap(ErrFolderPaused, folder)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.fmut.RLock()
|
|
||||||
files, existing := m.folderFiles[folder]
|
|
||||||
runner, running := m.folderRunners[folder]
|
|
||||||
m.fmut.RUnlock()
|
|
||||||
|
|
||||||
if !existing {
|
|
||||||
l.Infof("%v for nonexistent folder %q", op, folder)
|
|
||||||
return errors.Wrap(ErrFolderMissing, folder)
|
|
||||||
}
|
|
||||||
|
|
||||||
if running {
|
|
||||||
defer runner.SchedulePull()
|
|
||||||
}
|
|
||||||
|
|
||||||
m.pmut.RLock()
|
m.pmut.RLock()
|
||||||
downloads := m.deviceDownloads[deviceID]
|
indexHandler, ok := m.indexHandlers[deviceID]
|
||||||
m.pmut.RUnlock()
|
m.pmut.RUnlock()
|
||||||
downloads.Update(folder, makeForgetUpdate(fs))
|
if !ok {
|
||||||
|
// This should be impossible, as an index handler always exists for an
|
||||||
if !update {
|
// open connection, and this method can't be called on a closed
|
||||||
files.Drop(deviceID)
|
// connection
|
||||||
|
m.evLogger.Log(events.Failure, "index sender does not exist for connection on which indexes were received")
|
||||||
|
l.Debugf("%v for folder (ID %q) sent from device %q: missing index handler", op, folder, deviceID)
|
||||||
|
return errors.Wrap(errors.New("index handler missing"), folder)
|
||||||
}
|
}
|
||||||
for i := range fs {
|
|
||||||
// The local attributes should never be transmitted over the wire.
|
|
||||||
// Make sure they look like they weren't.
|
|
||||||
fs[i].LocalFlags = 0
|
|
||||||
fs[i].VersionHash = nil
|
|
||||||
}
|
|
||||||
files.Update(deviceID, fs)
|
|
||||||
|
|
||||||
seq := files.Sequence(deviceID)
|
return indexHandler.ReceiveIndex(folder, fs, update, op)
|
||||||
m.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{
|
}
|
||||||
"device": deviceID.String(),
|
|
||||||
"folder": folder,
|
|
||||||
"items": len(fs),
|
|
||||||
"sequence": seq,
|
|
||||||
"version": seq, // legacy for sequence
|
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
type clusterConfigDeviceInfo struct {
|
||||||
|
local, remote protocol.Device
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterConfig) error {
|
func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterConfig) error {
|
||||||
@ -1199,8 +1165,10 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
|||||||
// Also, collect a list of folders we do share, and if he's interested in
|
// Also, collect a list of folders we do share, and if he's interested in
|
||||||
// temporary indexes, subscribe the connection.
|
// temporary indexes, subscribe the connection.
|
||||||
|
|
||||||
|
l.Debugf("Handling ClusterConfig from %v", deviceID.Short())
|
||||||
|
|
||||||
m.pmut.RLock()
|
m.pmut.RLock()
|
||||||
indexSenderRegistry, ok := m.indexSenders[deviceID]
|
indexHandlerRegistry, ok := m.indexHandlers[deviceID]
|
||||||
m.pmut.RUnlock()
|
m.pmut.RUnlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
panic("bug: ClusterConfig called on closed or nonexistent connection")
|
panic("bug: ClusterConfig called on closed or nonexistent connection")
|
||||||
@ -1214,9 +1182,9 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
|||||||
|
|
||||||
// Assemble the device information from the connected device about
|
// Assemble the device information from the connected device about
|
||||||
// themselves and us for all folders.
|
// themselves and us for all folders.
|
||||||
ccDeviceInfos := make(map[string]*indexSenderStartInfo, len(cm.Folders))
|
ccDeviceInfos := make(map[string]*clusterConfigDeviceInfo, len(cm.Folders))
|
||||||
for _, folder := range cm.Folders {
|
for _, folder := range cm.Folders {
|
||||||
info := &indexSenderStartInfo{}
|
info := &clusterConfigDeviceInfo{}
|
||||||
for _, dev := range folder.Devices {
|
for _, dev := range folder.Devices {
|
||||||
if dev.ID == m.id {
|
if dev.ID == m.id {
|
||||||
info.local = dev
|
info.local = dev
|
||||||
@ -1269,7 +1237,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
|||||||
w.Wait()
|
w.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
tempIndexFolders, paused, err := m.ccHandleFolders(cm.Folders, deviceCfg, ccDeviceInfos, indexSenderRegistry)
|
tempIndexFolders, paused, err := m.ccHandleFolders(cm.Folders, deviceCfg, ccDeviceInfos, indexHandlerRegistry)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -1310,7 +1278,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.DeviceConfiguration, ccDeviceInfos map[string]*indexSenderStartInfo, indexSenders *indexSenderRegistry) ([]string, map[string]struct{}, error) {
|
func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.DeviceConfiguration, ccDeviceInfos map[string]*clusterConfigDeviceInfo, indexHandlers *indexHandlerRegistry) ([]string, map[string]struct{}, error) {
|
||||||
var folderDevice config.FolderDeviceConfiguration
|
var folderDevice config.FolderDeviceConfiguration
|
||||||
tempIndexFolders := make([]string, 0, len(folders))
|
tempIndexFolders := make([]string, 0, len(folders))
|
||||||
paused := make(map[string]struct{}, len(folders))
|
paused := make(map[string]struct{}, len(folders))
|
||||||
@ -1330,7 +1298,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
|
|||||||
folderDevice, ok = cfg.Device(deviceID)
|
folderDevice, ok = cfg.Device(deviceID)
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
indexSenders.remove(folder.ID)
|
indexHandlers.Remove(folder.ID)
|
||||||
if deviceCfg.IgnoredFolder(folder.ID) {
|
if deviceCfg.IgnoredFolder(folder.ID) {
|
||||||
l.Infof("Ignoring folder %s from device %s since we are configured to", folder.Description(), deviceID)
|
l.Infof("Ignoring folder %s from device %s since we are configured to", folder.Description(), deviceID)
|
||||||
continue
|
continue
|
||||||
@ -1341,7 +1309,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
|
|||||||
if err := m.db.AddOrUpdatePendingFolder(folder.ID, of, deviceID); err != nil {
|
if err := m.db.AddOrUpdatePendingFolder(folder.ID, of, deviceID); err != nil {
|
||||||
l.Warnf("Failed to persist pending folder entry to database: %v", err)
|
l.Warnf("Failed to persist pending folder entry to database: %v", err)
|
||||||
}
|
}
|
||||||
indexSenders.addPending(folder.ID, ccDeviceInfos[folder.ID])
|
indexHandlers.AddIndexInfo(folder.ID, ccDeviceInfos[folder.ID])
|
||||||
updatedPending = append(updatedPending, updatedPendingFolder{
|
updatedPending = append(updatedPending, updatedPendingFolder{
|
||||||
FolderID: folder.ID,
|
FolderID: folder.ID,
|
||||||
FolderLabel: folder.Label,
|
FolderLabel: folder.Label,
|
||||||
@ -1359,13 +1327,13 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
|
|||||||
}
|
}
|
||||||
|
|
||||||
if folder.Paused {
|
if folder.Paused {
|
||||||
indexSenders.remove(folder.ID)
|
indexHandlers.Remove(folder.ID)
|
||||||
paused[cfg.ID] = struct{}{}
|
paused[cfg.ID] = struct{}{}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Paused {
|
if cfg.Paused {
|
||||||
indexSenders.addPending(folder.ID, ccDeviceInfos[folder.ID])
|
indexHandlers.AddIndexInfo(folder.ID, ccDeviceInfos[folder.ID])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1403,29 +1371,10 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
|
|||||||
tempIndexFolders = append(tempIndexFolders, folder.ID)
|
tempIndexFolders = append(tempIndexFolders, folder.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.fmut.RLock()
|
indexHandlers.AddIndexInfo(folder.ID, ccDeviceInfos[folder.ID])
|
||||||
fs, ok := m.folderFiles[folder.ID]
|
|
||||||
m.fmut.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
// Shouldn't happen because !cfg.Paused, but might happen
|
|
||||||
// if the folder is about to be unpaused, but not yet.
|
|
||||||
l.Debugln("ccH: no fset", folder.ID)
|
|
||||||
indexSenders.addPending(folder.ID, ccDeviceInfos[folder.ID])
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
indexSenders.add(cfg, fs, ccDeviceInfos[folder.ID])
|
indexHandlers.RemoveAllExcept(seenFolders)
|
||||||
|
|
||||||
// We might already have files that we need to pull so let the
|
|
||||||
// folder runner know that it should recheck the index data.
|
|
||||||
m.fmut.RLock()
|
|
||||||
if runner := m.folderRunners[folder.ID]; runner != nil {
|
|
||||||
defer runner.SchedulePull()
|
|
||||||
}
|
|
||||||
m.fmut.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
indexSenders.removeAllExcept(seenFolders)
|
|
||||||
for folder := range expiredPending {
|
for folder := range expiredPending {
|
||||||
m.db.RemovePendingFolderForDevice(folder, deviceID)
|
m.db.RemovePendingFolderForDevice(folder, deviceID)
|
||||||
}
|
}
|
||||||
@ -1446,7 +1395,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
|
|||||||
return tempIndexFolders, paused, nil
|
return tempIndexFolders, paused, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *model) ccCheckEncryption(fcfg config.FolderConfiguration, folderDevice config.FolderDeviceConfiguration, ccDeviceInfos *indexSenderStartInfo, deviceUntrusted bool) error {
|
func (m *model) ccCheckEncryption(fcfg config.FolderConfiguration, folderDevice config.FolderDeviceConfiguration, ccDeviceInfos *clusterConfigDeviceInfo, deviceUntrusted bool) error {
|
||||||
hasTokenRemote := len(ccDeviceInfos.remote.EncryptionPasswordToken) > 0
|
hasTokenRemote := len(ccDeviceInfos.remote.EncryptionPasswordToken) > 0
|
||||||
hasTokenLocal := len(ccDeviceInfos.local.EncryptionPasswordToken) > 0
|
hasTokenLocal := len(ccDeviceInfos.local.EncryptionPasswordToken) > 0
|
||||||
isEncryptedRemote := folderDevice.EncryptionPassword != ""
|
isEncryptedRemote := folderDevice.EncryptionPassword != ""
|
||||||
@ -1688,7 +1637,7 @@ func (m *model) handleDeintroductions(introducerCfg config.DeviceConfiguration,
|
|||||||
|
|
||||||
// handleAutoAccepts handles adding and sharing folders for devices that have
|
// handleAutoAccepts handles adding and sharing folders for devices that have
|
||||||
// AutoAcceptFolders set to true.
|
// AutoAcceptFolders set to true.
|
||||||
func (m *model) handleAutoAccepts(deviceID protocol.DeviceID, folder protocol.Folder, ccDeviceInfos *indexSenderStartInfo, cfg config.FolderConfiguration, haveCfg bool, defaultPath string) (config.FolderConfiguration, bool) {
|
func (m *model) handleAutoAccepts(deviceID protocol.DeviceID, folder protocol.Folder, ccDeviceInfos *clusterConfigDeviceInfo, cfg config.FolderConfiguration, haveCfg bool, defaultPath string) (config.FolderConfiguration, bool) {
|
||||||
if !haveCfg {
|
if !haveCfg {
|
||||||
defaultPathFs := fs.NewFilesystem(fs.FilesystemTypeBasic, defaultPath)
|
defaultPathFs := fs.NewFilesystem(fs.FilesystemTypeBasic, defaultPath)
|
||||||
pathAlternatives := []string{
|
pathAlternatives := []string{
|
||||||
@ -1783,7 +1732,7 @@ func (m *model) Closed(device protocol.DeviceID, err error) {
|
|||||||
delete(m.remotePausedFolders, device)
|
delete(m.remotePausedFolders, device)
|
||||||
closed := m.closed[device]
|
closed := m.closed[device]
|
||||||
delete(m.closed, device)
|
delete(m.closed, device)
|
||||||
delete(m.indexSenders, device)
|
delete(m.indexHandlers, device)
|
||||||
m.pmut.Unlock()
|
m.pmut.Unlock()
|
||||||
|
|
||||||
m.progressEmitter.temporaryIndexUnsubscribe(conn)
|
m.progressEmitter.temporaryIndexUnsubscribe(conn)
|
||||||
@ -2244,6 +2193,9 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The slightly unusual locking sequence here is because we must acquire
|
||||||
|
// fmut before pmut. (The locks can be *released* in any order.)
|
||||||
|
m.fmut.RLock()
|
||||||
m.pmut.Lock()
|
m.pmut.Lock()
|
||||||
if oldConn, ok := m.conn[deviceID]; ok {
|
if oldConn, ok := m.conn[deviceID]; ok {
|
||||||
l.Infoln("Replacing old connection", oldConn, "with", conn, "for", deviceID)
|
l.Infoln("Replacing old connection", oldConn, "with", conn, "for", deviceID)
|
||||||
@ -2253,9 +2205,12 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
|
|||||||
// actual close without holding pmut as the connection will call
|
// actual close without holding pmut as the connection will call
|
||||||
// back into Closed() for the cleanup.
|
// back into Closed() for the cleanup.
|
||||||
closed := m.closed[deviceID]
|
closed := m.closed[deviceID]
|
||||||
|
m.fmut.RUnlock()
|
||||||
m.pmut.Unlock()
|
m.pmut.Unlock()
|
||||||
oldConn.Close(errReplacingConnection)
|
oldConn.Close(errReplacingConnection)
|
||||||
<-closed
|
<-closed
|
||||||
|
// Again, lock fmut before pmut.
|
||||||
|
m.fmut.RLock()
|
||||||
m.pmut.Lock()
|
m.pmut.Lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2263,7 +2218,12 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
|
|||||||
closed := make(chan struct{})
|
closed := make(chan struct{})
|
||||||
m.closed[deviceID] = closed
|
m.closed[deviceID] = closed
|
||||||
m.deviceDownloads[deviceID] = newDeviceDownloadState()
|
m.deviceDownloads[deviceID] = newDeviceDownloadState()
|
||||||
m.indexSenders[deviceID] = newIndexSenderRegistry(conn, closed, m.Supervisor, m.evLogger)
|
indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], closed, m.Supervisor, m.evLogger)
|
||||||
|
for id, fcfg := range m.folderCfgs {
|
||||||
|
indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id])
|
||||||
|
}
|
||||||
|
m.indexHandlers[deviceID] = indexRegistry
|
||||||
|
m.fmut.RUnlock()
|
||||||
// 0: default, <0: no limiting
|
// 0: default, <0: no limiting
|
||||||
switch {
|
switch {
|
||||||
case device.MaxRequestKiB > 0:
|
case device.MaxRequestKiB > 0:
|
||||||
@ -3119,23 +3079,6 @@ func readOffsetIntoBuf(fs fs.Filesystem, file string, offset int64, buf []byte)
|
|||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeForgetUpdate takes an index update and constructs a download progress update
|
|
||||||
// causing to forget any progress for files which we've just been sent.
|
|
||||||
func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate {
|
|
||||||
updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files))
|
|
||||||
for _, file := range files {
|
|
||||||
if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
updates = append(updates, protocol.FileDownloadProgressUpdate{
|
|
||||||
Name: file.Name,
|
|
||||||
Version: file.Version,
|
|
||||||
UpdateType: protocol.FileDownloadProgressUpdateTypeForget,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return updates
|
|
||||||
}
|
|
||||||
|
|
||||||
// folderDeviceSet is a set of (folder, deviceID) pairs
|
// folderDeviceSet is a set of (folder, deviceID) pairs
|
||||||
type folderDeviceSet map[string]map[protocol.DeviceID]struct{}
|
type folderDeviceSet map[string]map[protocol.DeviceID]struct{}
|
||||||
|
|
||||||
|
@ -220,15 +220,16 @@ func BenchmarkIndex_100(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkIndex(b *testing.B, nfiles int) {
|
func benchmarkIndex(b *testing.B, nfiles int) {
|
||||||
m := setupModel(b, defaultCfgWrapper)
|
m, _, fcfg, wcfgCancel := setupModelWithConnection(b)
|
||||||
defer cleanupModel(m)
|
defer wcfgCancel()
|
||||||
|
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
|
||||||
|
|
||||||
files := genFiles(nfiles)
|
files := genFiles(nfiles)
|
||||||
must(b, m.Index(device1, "default", files))
|
must(b, m.Index(device1, fcfg.ID, files))
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
must(b, m.Index(device1, "default", files))
|
must(b, m.Index(device1, fcfg.ID, files))
|
||||||
}
|
}
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
}
|
}
|
||||||
@ -246,17 +247,18 @@ func BenchmarkIndexUpdate_10000_1(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkIndexUpdate(b *testing.B, nfiles, nufiles int) {
|
func benchmarkIndexUpdate(b *testing.B, nfiles, nufiles int) {
|
||||||
m := setupModel(b, defaultCfgWrapper)
|
m, _, fcfg, wcfgCancel := setupModelWithConnection(b)
|
||||||
defer cleanupModel(m)
|
defer wcfgCancel()
|
||||||
|
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
|
||||||
|
|
||||||
files := genFiles(nfiles)
|
files := genFiles(nfiles)
|
||||||
ufiles := genFiles(nufiles)
|
ufiles := genFiles(nufiles)
|
||||||
|
|
||||||
must(b, m.Index(device1, "default", files))
|
must(b, m.Index(device1, fcfg.ID, files))
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
must(b, m.IndexUpdate(device1, "default", ufiles))
|
must(b, m.IndexUpdate(device1, fcfg.ID, ufiles))
|
||||||
}
|
}
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
}
|
}
|
||||||
@ -1695,9 +1697,8 @@ func TestRWScanRecovery(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGlobalDirectoryTree(t *testing.T) {
|
func TestGlobalDirectoryTree(t *testing.T) {
|
||||||
w, fcfg, wCancel := tmpDefaultWrapper()
|
m, _, fcfg, wCancel := setupModelWithConnection(t)
|
||||||
defer wCancel()
|
defer wCancel()
|
||||||
m := setupModel(t, w)
|
|
||||||
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
|
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
|
||||||
|
|
||||||
b := func(isfile bool, path ...string) protocol.FileInfo {
|
b := func(isfile bool, path ...string) protocol.FileInfo {
|
||||||
@ -1999,18 +2000,18 @@ func BenchmarkTree_100_10(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkTree(b *testing.B, n1, n2 int) {
|
func benchmarkTree(b *testing.B, n1, n2 int) {
|
||||||
m := newModel(b, defaultCfgWrapper, myID, "syncthing", "dev", nil)
|
m, _, fcfg, wcfgCancel := setupModelWithConnection(b)
|
||||||
m.ServeBackground()
|
defer wcfgCancel()
|
||||||
defer cleanupModel(m)
|
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
|
||||||
|
|
||||||
m.ScanFolder("default")
|
m.ScanFolder(fcfg.ID)
|
||||||
files := genDeepFiles(n1, n2)
|
files := genDeepFiles(n1, n2)
|
||||||
|
|
||||||
must(b, m.Index(device1, "default", files))
|
must(b, m.Index(device1, fcfg.ID, files))
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
m.GlobalDirectoryTree("default", "", -1, false)
|
m.GlobalDirectoryTree(fcfg.ID, "", -1, false)
|
||||||
}
|
}
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
}
|
}
|
||||||
@ -2627,43 +2628,43 @@ func TestCustomMarkerName(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestRemoveDirWithContent(t *testing.T) {
|
func TestRemoveDirWithContent(t *testing.T) {
|
||||||
defer func() {
|
m, _, fcfg, wcfgCancel := setupModelWithConnection(t)
|
||||||
defaultFs.RemoveAll("dirwith")
|
defer wcfgCancel()
|
||||||
}()
|
tfs := fcfg.Filesystem()
|
||||||
|
defer cleanupModelAndRemoveDir(m, tfs.URI())
|
||||||
|
|
||||||
defaultFs.MkdirAll("dirwith", 0755)
|
tfs.MkdirAll("dirwith", 0755)
|
||||||
content := filepath.Join("dirwith", "content")
|
content := filepath.Join("dirwith", "content")
|
||||||
fd, err := defaultFs.Create(content)
|
fd, err := tfs.Create(content)
|
||||||
must(t, err)
|
must(t, err)
|
||||||
fd.Close()
|
fd.Close()
|
||||||
|
|
||||||
m := setupModel(t, defaultCfgWrapper)
|
must(t, m.ScanFolder(fcfg.ID))
|
||||||
defer cleanupModel(m)
|
|
||||||
|
|
||||||
dir, ok := m.testCurrentFolderFile("default", "dirwith")
|
dir, ok := m.testCurrentFolderFile(fcfg.ID, "dirwith")
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("Can't get dir \"dirwith\" after initial scan")
|
t.Fatalf("Can't get dir \"dirwith\" after initial scan")
|
||||||
}
|
}
|
||||||
dir.Deleted = true
|
dir.Deleted = true
|
||||||
dir.Version = dir.Version.Update(device1.Short()).Update(device1.Short())
|
dir.Version = dir.Version.Update(device1.Short()).Update(device1.Short())
|
||||||
|
|
||||||
file, ok := m.testCurrentFolderFile("default", content)
|
file, ok := m.testCurrentFolderFile(fcfg.ID, content)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("Can't get file \"%v\" after initial scan", content)
|
t.Fatalf("Can't get file \"%v\" after initial scan", content)
|
||||||
}
|
}
|
||||||
file.Deleted = true
|
file.Deleted = true
|
||||||
file.Version = file.Version.Update(device1.Short()).Update(device1.Short())
|
file.Version = file.Version.Update(device1.Short()).Update(device1.Short())
|
||||||
|
|
||||||
must(t, m.IndexUpdate(device1, "default", []protocol.FileInfo{dir, file}))
|
must(t, m.IndexUpdate(device1, fcfg.ID, []protocol.FileInfo{dir, file}))
|
||||||
|
|
||||||
// Is there something we could trigger on instead of just waiting?
|
// Is there something we could trigger on instead of just waiting?
|
||||||
timeout := time.NewTimer(5 * time.Second)
|
timeout := time.NewTimer(5 * time.Second)
|
||||||
for {
|
for {
|
||||||
dir, ok := m.testCurrentFolderFile("default", "dirwith")
|
dir, ok := m.testCurrentFolderFile(fcfg.ID, "dirwith")
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("Can't get dir \"dirwith\" after index update")
|
t.Fatalf("Can't get dir \"dirwith\" after index update")
|
||||||
}
|
}
|
||||||
file, ok := m.testCurrentFolderFile("default", content)
|
file, ok := m.testCurrentFolderFile(fcfg.ID, content)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("Can't get file \"%v\" after index update", content)
|
t.Fatalf("Can't get file \"%v\" after index update", content)
|
||||||
}
|
}
|
||||||
@ -3766,14 +3767,14 @@ func TestAddFolderCompletion(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestScanDeletedROChangedOnSR(t *testing.T) {
|
func TestScanDeletedROChangedOnSR(t *testing.T) {
|
||||||
w, fcfg, wCancel := tmpDefaultWrapper()
|
m, _, fcfg, wCancel := setupModelWithConnection(t)
|
||||||
defer wCancel()
|
|
||||||
fcfg.Type = config.FolderTypeReceiveOnly
|
|
||||||
setFolder(t, w, fcfg)
|
|
||||||
m := setupModel(t, w)
|
|
||||||
defer cleanupModel(m)
|
|
||||||
name := "foo"
|
|
||||||
ffs := fcfg.Filesystem()
|
ffs := fcfg.Filesystem()
|
||||||
|
defer wCancel()
|
||||||
|
defer cleanupModelAndRemoveDir(m, ffs.URI())
|
||||||
|
fcfg.Type = config.FolderTypeReceiveOnly
|
||||||
|
setFolder(t, m.cfg, fcfg)
|
||||||
|
|
||||||
|
name := "foo"
|
||||||
|
|
||||||
must(t, writeFile(ffs, name, []byte(name), 0644))
|
must(t, writeFile(ffs, name, []byte(name), 0644))
|
||||||
m.ScanFolders()
|
m.ScanFolders()
|
||||||
@ -3794,7 +3795,7 @@ func TestScanDeletedROChangedOnSR(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fcfg.Type = config.FolderTypeSendReceive
|
fcfg.Type = config.FolderTypeSendReceive
|
||||||
setFolder(t, w, fcfg)
|
setFolder(t, m.cfg, fcfg)
|
||||||
m.ScanFolders()
|
m.ScanFolders()
|
||||||
|
|
||||||
if receiveOnlyChangedSize(t, m, fcfg.ID).Deleted != 0 {
|
if receiveOnlyChangedSize(t, m, fcfg.ID).Deleted != 0 {
|
||||||
@ -3889,6 +3890,8 @@ func TestIssue6961(t *testing.T) {
|
|||||||
}
|
}
|
||||||
m.ServeBackground()
|
m.ServeBackground()
|
||||||
defer cleanupModelAndRemoveDir(m, tfs.URI())
|
defer cleanupModelAndRemoveDir(m, tfs.URI())
|
||||||
|
addFakeConn(m, device1, fcfg.ID)
|
||||||
|
addFakeConn(m, device2, fcfg.ID)
|
||||||
m.ScanFolders()
|
m.ScanFolders()
|
||||||
|
|
||||||
name := "foo"
|
name := "foo"
|
||||||
@ -3937,9 +3940,8 @@ func TestIssue6961(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestCompletionEmptyGlobal(t *testing.T) {
|
func TestCompletionEmptyGlobal(t *testing.T) {
|
||||||
wcfg, fcfg, wcfgCancel := tmpDefaultWrapper()
|
m, _, fcfg, wcfgCancel := setupModelWithConnection(t)
|
||||||
defer wcfgCancel()
|
defer wcfgCancel()
|
||||||
m := setupModel(t, wcfg)
|
|
||||||
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
|
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI())
|
||||||
files := []protocol.FileInfo{{Name: "foo", Version: protocol.Vector{}.Update(myID.Short()), Sequence: 1}}
|
files := []protocol.FileInfo{{Name: "foo", Version: protocol.Vector{}.Update(myID.Short()), Sequence: 1}}
|
||||||
m.fmut.Lock()
|
m.fmut.Lock()
|
||||||
@ -3960,6 +3962,8 @@ func TestNeedMetaAfterIndexReset(t *testing.T) {
|
|||||||
addDevice2(t, w, fcfg)
|
addDevice2(t, w, fcfg)
|
||||||
m := setupModel(t, w)
|
m := setupModel(t, w)
|
||||||
defer cleanupModelAndRemoveDir(m, fcfg.Path)
|
defer cleanupModelAndRemoveDir(m, fcfg.Path)
|
||||||
|
addFakeConn(m, device1, fcfg.ID)
|
||||||
|
addFakeConn(m, device2, fcfg.ID)
|
||||||
|
|
||||||
var seq int64 = 1
|
var seq int64 = 1
|
||||||
files := []protocol.FileInfo{{Name: "foo", Size: 10, Version: protocol.Vector{}.Update(device1.Short()), Sequence: seq}}
|
files := []protocol.FileInfo{{Name: "foo", Size: 10, Version: protocol.Vector{}.Update(device1.Short()), Sequence: seq}}
|
||||||
@ -4097,7 +4101,7 @@ func TestCcCheckEncryption(t *testing.T) {
|
|||||||
dcfg.EncryptionPassword = pw
|
dcfg.EncryptionPassword = pw
|
||||||
}
|
}
|
||||||
|
|
||||||
deviceInfos := &indexSenderStartInfo{
|
deviceInfos := &clusterConfigDeviceInfo{
|
||||||
remote: protocol.Device{ID: device1, EncryptionPasswordToken: tc.tokenRemote},
|
remote: protocol.Device{ID: device1, EncryptionPasswordToken: tc.tokenRemote},
|
||||||
local: protocol.Device{ID: myID, EncryptionPasswordToken: tc.tokenLocal},
|
local: protocol.Device{ID: myID, EncryptionPasswordToken: tc.tokenLocal},
|
||||||
}
|
}
|
||||||
|
@ -1446,6 +1446,7 @@ func TestRequestGlobalInvalidToValid(t *testing.T) {
|
|||||||
})
|
})
|
||||||
must(t, err)
|
must(t, err)
|
||||||
waiter.Wait()
|
waiter.Wait()
|
||||||
|
addFakeConn(m, device2, fcfg.ID)
|
||||||
tfs := fcfg.Filesystem()
|
tfs := fcfg.Filesystem()
|
||||||
defer cleanupModelAndRemoveDir(m, tfs.URI())
|
defer cleanupModelAndRemoveDir(m, tfs.URI())
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user