diff --git a/lib/fs/fakefs.go b/lib/fs/fakefs.go index 006cae4b9..4d75ab0f1 100644 --- a/lib/fs/fakefs.go +++ b/lib/fs/fakefs.go @@ -885,7 +885,13 @@ func (f *fakeFile) Truncate(size int64) error { defer f.mut.Unlock() if f.content != nil { - f.content = f.content[:int(size)] + if int64(cap(f.content)) < size { + c := make([]byte, size) + copy(c[:len(f.content)], f.content) + f.content = c + } else { + f.content = f.content[:int(size)] + } } f.rng = nil f.size = size diff --git a/lib/model/folder_recvonly_test.go b/lib/model/folder_recvonly_test.go index a65c792dd..094efe504 100644 --- a/lib/model/folder_recvonly_test.go +++ b/lib/model/folder_recvonly_test.go @@ -29,6 +29,7 @@ func TestRecvOnlyRevertDeletes(t *testing.T) { defer wcfgCancel() ffs := f.Filesystem() defer cleanupModel(m) + addFakeConn(m, device1, f.ID) // Create some test data @@ -110,6 +111,7 @@ func TestRecvOnlyRevertNeeds(t *testing.T) { defer wcfgCancel() ffs := f.Filesystem() defer cleanupModel(m) + addFakeConn(m, device1, f.ID) // Create some test data @@ -199,6 +201,7 @@ func TestRecvOnlyUndoChanges(t *testing.T) { defer wcfgCancel() ffs := f.Filesystem() defer cleanupModel(m) + addFakeConn(m, device1, f.ID) // Create some test data @@ -268,6 +271,7 @@ func TestRecvOnlyDeletedRemoteDrop(t *testing.T) { defer wcfgCancel() ffs := f.Filesystem() defer cleanupModel(m) + addFakeConn(m, device1, f.ID) // Create some test data @@ -332,6 +336,7 @@ func TestRecvOnlyRemoteUndoChanges(t *testing.T) { defer wcfgCancel() ffs := f.Filesystem() defer cleanupModel(m) + addFakeConn(m, device1, f.ID) // Create some test data diff --git a/lib/model/folder_sendrecv_test.go b/lib/model/folder_sendrecv_test.go index c2056b742..38f86b39a 100644 --- a/lib/model/folder_sendrecv_test.go +++ b/lib/model/folder_sendrecv_test.go @@ -1280,6 +1280,7 @@ func TestPullSymlinkOverExistingWindows(t *testing.T) { m, f, wcfgCancel := setupSendReceiveFolder(t) defer cleanupSRFolder(f, m, wcfgCancel) + addFakeConn(m, device1, f.ID) name := "foo" if fd, err := f.mtimefs.Create(name); err != nil { diff --git a/lib/model/indexhandler.go b/lib/model/indexhandler.go new file mode 100644 index 000000000..eac421aeb --- /dev/null +++ b/lib/model/indexhandler.go @@ -0,0 +1,549 @@ +// Copyright (C) 2020 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package model + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/thejerf/suture/v4" + + "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/db" + "github.com/syncthing/syncthing/lib/events" + "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/svcutil" +) + +type indexHandler struct { + conn protocol.Connection + downloads *deviceDownloadState + folder string + folderIsReceiveEncrypted bool + prevSequence int64 + evLogger events.Logger + token suture.ServiceToken + + cond *sync.Cond + paused bool + fset *db.FileSet + runner service +} + +func newIndexHandler(conn protocol.Connection, downloads *deviceDownloadState, folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo, evLogger events.Logger) *indexHandler { + myIndexID := fset.IndexID(protocol.LocalDeviceID) + mySequence := fset.Sequence(protocol.LocalDeviceID) + var startSequence int64 + + // This is the other side's description of what it knows + // about us. Lets check to see if we can start sending index + // updates directly or need to send the index from start... + + if startInfo.local.IndexID == myIndexID { + // They say they've seen our index ID before, so we can + // send a delta update only. + + if startInfo.local.MaxSequence > mySequence { + // Safety check. They claim to have more or newer + // index data than we have - either we have lost + // index data, or reset the index without resetting + // the IndexID, or something else weird has + // happened. We send a full index to reset the + // situation. + l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", conn.ID().Short(), folder.Description()) + startSequence = 0 + } else { + l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", conn.ID().Short(), folder.Description(), startInfo.local.MaxSequence) + startSequence = startInfo.local.MaxSequence + } + } else if startInfo.local.IndexID != 0 { + // They say they've seen an index ID from us, but it's + // not the right one. Either they are confused or we + // must have reset our database since last talking to + // them. We'll start with a full index transfer. + l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", conn.ID().Short(), folder.Description(), startInfo.local.IndexID, myIndexID) + startSequence = 0 + } else { + l.Debugf("Device %v folder %s has no index ID for us", conn.ID().Short(), folder.Description()) + } + + // This is the other side's description of themselves. We + // check to see that it matches the IndexID we have on file, + // otherwise we drop our old index data and expect to get a + // completely new set. + + theirIndexID := fset.IndexID(conn.ID()) + if startInfo.remote.IndexID == 0 { + // They're not announcing an index ID. This means they + // do not support delta indexes and we should clear any + // information we have from them before accepting their + // index, which will presumably be a full index. + l.Debugf("Device %v folder %s does not announce an index ID", conn.ID().Short(), folder.Description()) + fset.Drop(conn.ID()) + } else if startInfo.remote.IndexID != theirIndexID { + // The index ID we have on file is not what they're + // announcing. They must have reset their database and + // will probably send us a full index. We drop any + // information we have and remember this new index ID + // instead. + l.Infof("Device %v folder %s has a new index ID (%v)", conn.ID().Short(), folder.Description(), startInfo.remote.IndexID) + fset.Drop(conn.ID()) + fset.SetIndexID(conn.ID(), startInfo.remote.IndexID) + } + + return &indexHandler{ + conn: conn, + downloads: downloads, + folder: folder.ID, + folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted, + prevSequence: startSequence, + evLogger: evLogger, + + fset: fset, + runner: runner, + cond: sync.NewCond(new(sync.Mutex)), + } +} + +func (s *indexHandler) Serve(ctx context.Context) (err error) { + l.Debugf("Starting index handler for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence) + defer func() { + err = svcutil.NoRestartErr(err) + l.Debugf("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err) + }() + + // We need to send one index, regardless of whether there is something to send or not + s.cond.L.Lock() + for s.paused { + s.cond.Wait() + } + fset := s.fset + s.cond.L.Unlock() + err = s.sendIndexTo(ctx, fset) + + // Subscribe to LocalIndexUpdated (we have new information to send) and + // DeviceDisconnected (it might be us who disconnected, so we should + // exit). + sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected) + defer sub.Unsubscribe() + + evChan := sub.C() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for err == nil { + s.cond.L.Lock() + for s.paused { + s.cond.Wait() + } + fset := s.fset + s.cond.L.Unlock() + + // While we have sent a sequence at least equal to the one + // currently in the database, wait for the local index to update. The + // local index may update for other folders than the one we are + // sending for. + if fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence { + select { + case <-ctx.Done(): + return ctx.Err() + case <-evChan: + case <-ticker.C: + } + continue + } + + err = s.sendIndexTo(ctx, fset) + + // Wait a short amount of time before entering the next loop. If there + // are continuous changes happening to the local index, this gives us + // time to batch them up a little. + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(250 * time.Millisecond): + } + } + + return err +} + +func (s *indexHandler) resume(fset *db.FileSet, runner service) { + s.cond.L.Lock() + if !s.paused { + s.evLogger.Log(events.Failure, "index handler got resumed while not paused") + } + s.paused = false + s.fset = fset + s.runner = runner + s.cond.L.Unlock() +} + +func (s *indexHandler) pause() { + s.cond.L.Lock() + if s.paused { + s.evLogger.Log(events.Failure, "index handler got paused while already paused") + } + s.paused = true + s.fset = nil + s.runner = nil + s.cond.L.Unlock() +} + +// sendIndexTo sends file infos with a sequence number higher than prevSequence and +// returns the highest sent sequence number. +func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error { + initial := s.prevSequence == 0 + batch := db.NewFileInfoBatch(nil) + batch.SetFlushFunc(func(fs []protocol.FileInfo) error { + l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size()) + if initial { + initial = false + return s.conn.Index(ctx, s.folder, fs) + } + return s.conn.IndexUpdate(ctx, s.folder, fs) + }) + + var err error + var f protocol.FileInfo + snap, err := fset.Snapshot() + if err != nil { + return svcutil.AsFatalErr(err, svcutil.ExitError) + } + defer snap.Release() + previousWasDelete := false + snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool { + // This is to make sure that renames (which is an add followed by a delete) land in the same batch. + // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that + // the batch ends with a non-delete, or that the last item in the batch is already a delete + if batch.Full() && (!fi.IsDeleted() || previousWasDelete) { + if err = batch.Flush(); err != nil { + return false + } + } + + if shouldDebug() { + if fi.SequenceNo() < s.prevSequence+1 { + panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1)) + } + } + + if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence { + l.Warnln("Non-increasing sequence detected: Checking and repairing the db...") + // Abort this round of index sending - the next one will pick + // up from the last successful one with the repeaired db. + defer func() { + if fixed, dbErr := fset.RepairSequence(); dbErr != nil { + l.Warnln("Failed repairing sequence entries:", dbErr) + panic("Failed repairing sequence entries") + } else { + s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence") + l.Infof("Repaired %v sequence entries in database", fixed) + } + }() + return false + } + + f = fi.(protocol.FileInfo) + + // If this is a folder receiving encrypted files only, we + // mustn't ever send locally changed file infos. Those aren't + // encrypted and thus would be a protocol error at the remote. + if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() { + return true + } + + f = prepareFileInfoForIndex(f) + + previousWasDelete = f.IsDeleted() + + batch.Append(f) + return true + }) + if err != nil { + return err + } + + err = batch.Flush() + + // True if there was nothing to be sent + if f.Sequence == 0 { + return err + } + + s.prevSequence = f.Sequence + return err +} + +func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string) error { + deviceID := s.conn.ID() + + s.cond.L.Lock() + paused := s.paused + fset := s.fset + runner := s.runner + s.cond.L.Unlock() + + if paused { + l.Infof("%v for paused folder %q", op, s.folder) + return fmt.Errorf("%v: %w", s.folder, ErrFolderPaused) + } + + defer runner.SchedulePull() + + s.downloads.Update(s.folder, makeForgetUpdate(fs)) + + if !update { + fset.Drop(deviceID) + } + for i := range fs { + // The local attributes should never be transmitted over the wire. + // Make sure they look like they weren't. + fs[i].LocalFlags = 0 + fs[i].VersionHash = nil + } + fset.Update(deviceID, fs) + + seq := fset.Sequence(deviceID) + s.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{ + "device": deviceID.String(), + "folder": s.folder, + "items": len(fs), + "sequence": seq, + "version": seq, // legacy for sequence + }) + + return nil +} + +func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo { + // Mark the file as invalid if any of the local bad stuff flags are set. + f.RawInvalid = f.IsInvalid() + // If the file is marked LocalReceive (i.e., changed locally on a + // receive only folder) we do not want it to ever become the + // globally best version, invalid or not. + if f.IsReceiveOnlyChanged() { + f.Version = protocol.Vector{} + } + // never sent externally + f.LocalFlags = 0 + f.VersionHash = nil + return f +} + +func (s *indexHandler) String() string { + return fmt.Sprintf("indexHandler@%p for %s to %s at %s", s, s.folder, s.conn.ID().Short(), s.conn) +} + +type indexHandlerRegistry struct { + sup *suture.Supervisor + evLogger events.Logger + conn protocol.Connection + downloads *deviceDownloadState + indexHandlers map[string]*indexHandler + startInfos map[string]*clusterConfigDeviceInfo + folderStates map[string]*indexHandlerFolderState + mut sync.Mutex +} + +type indexHandlerFolderState struct { + cfg config.FolderConfiguration + fset *db.FileSet + runner service +} + +func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, closed chan struct{}, parentSup *suture.Supervisor, evLogger events.Logger) *indexHandlerRegistry { + r := &indexHandlerRegistry{ + conn: conn, + downloads: downloads, + evLogger: evLogger, + indexHandlers: make(map[string]*indexHandler), + startInfos: make(map[string]*clusterConfigDeviceInfo), + folderStates: make(map[string]*indexHandlerFolderState), + mut: sync.Mutex{}, + } + r.sup = suture.New(r.String(), svcutil.SpecWithDebugLogger(l)) + ourToken := parentSup.Add(r.sup) + r.sup.Add(svcutil.AsService(func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-closed: + parentSup.Remove(ourToken) + } + return nil + }, fmt.Sprintf("%v/waitForClosed", r))) + return r +} + +func (r *indexHandlerRegistry) String() string { + return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.ID().Short()) +} + +func (r *indexHandlerRegistry) GetSupervisor() *suture.Supervisor { + return r.sup +} + +func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) { + if is, ok := r.indexHandlers[folder.ID]; ok { + r.sup.RemoveAndWait(is.token, 0) + delete(r.indexHandlers, folder.ID) + } + delete(r.startInfos, folder.ID) + + is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger) + is.token = r.sup.Add(is) + r.indexHandlers[folder.ID] = is +} + +// AddIndexInfo starts an index handler for given folder, unless it is paused. +// If it is paused, the given startInfo is stored to start the sender once the +// folder is resumed. +// If an index handler is already running, it will be stopped first. +func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterConfigDeviceInfo) { + r.mut.Lock() + defer r.mut.Unlock() + + if is, ok := r.indexHandlers[folder]; ok { + r.sup.RemoveAndWait(is.token, 0) + delete(r.indexHandlers, folder) + l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.ID().Short(), folder) + } + folderState, ok := r.folderStates[folder] + if !ok { + l.Debugf("Pending index handler for device %v and folder %v", r.conn.ID().Short(), folder) + r.startInfos[folder] = startInfo + return + } + r.startLocked(folderState.cfg, folderState.fset, folderState.runner, startInfo) +} + +// Remove stops a running index handler or removes one pending to be started. +// It is a noop if the folder isn't known. +func (r *indexHandlerRegistry) Remove(folder string) { + r.mut.Lock() + defer r.mut.Unlock() + + l.Debugf("Removing index handler for device %v and folder %v", r.conn.ID().Short(), folder) + if is, ok := r.indexHandlers[folder]; ok { + r.sup.RemoveAndWait(is.token, 0) + delete(r.indexHandlers, folder) + } + delete(r.startInfos, folder) + l.Debugf("Removed index handler for device %v and folder %v", r.conn.ID().Short(), folder) +} + +// RemoveAllExcept stops all running index handlers and removes those pending to be started, +// except mentioned ones. +// It is a noop if the folder isn't known. +func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]struct{}) { + r.mut.Lock() + defer r.mut.Unlock() + + for folder, is := range r.indexHandlers { + if _, ok := except[folder]; !ok { + r.sup.RemoveAndWait(is.token, 0) + delete(r.indexHandlers, folder) + l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.ID().Short(), folder) + } + } + for folder := range r.startInfos { + if _, ok := except[folder]; !ok { + delete(r.startInfos, folder) + l.Debugf("Removed pending index handler for device %v and folder %v (removeAllExcept)", r.conn.ID().Short(), folder) + } + } +} + +// RegisterFolderState must be called whenever something about the folder +// changes. The exception being if the folder is removed entirely, then call +// Remove. The fset and runner arguments may be nil, if given folder is paused. +func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfiguration, fset *db.FileSet, runner service) { + if !folder.SharedWith(r.conn.ID()) { + r.Remove(folder.ID) + return + } + + r.mut.Lock() + if folder.Paused { + r.folderPausedLocked(folder.ID) + } else { + r.folderStartedLocked(folder, fset, runner) + } + r.mut.Unlock() +} + +// folderPausedLocked stops a running index handler. +// It is a noop if the folder isn't known or has not been started yet. +func (r *indexHandlerRegistry) folderPausedLocked(folder string) { + l.Debugf("Pausing index handler for device %v and folder %v", r.conn.ID().Short(), folder) + delete(r.folderStates, folder) + if is, ok := r.indexHandlers[folder]; ok { + is.pause() + l.Debugf("Paused index handler for device %v and folder %v", r.conn.ID().Short(), folder) + } else { + l.Debugf("No index handler for device %v and folder %v to pause", r.conn.ID().Short(), folder) + } +} + +// folderStartedLocked resumes an already running index handler or starts it, if it +// was added while paused. +// It is a noop if the folder isn't known. +func (r *indexHandlerRegistry) folderStartedLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service) { + r.folderStates[folder.ID] = &indexHandlerFolderState{ + cfg: folder, + fset: fset, + runner: runner, + } + + is, isOk := r.indexHandlers[folder.ID] + if info, ok := r.startInfos[folder.ID]; ok { + if isOk { + r.sup.RemoveAndWait(is.token, 0) + delete(r.indexHandlers, folder.ID) + l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.ID().Short(), folder.ID) + } + r.startLocked(folder, fset, runner, info) + delete(r.startInfos, folder.ID) + l.Debugf("Started index handler for device %v and folder %v in resume", r.conn.ID().Short(), folder.ID) + } else if isOk { + l.Debugf("Resuming index handler for device %v and folder %v", r.conn.ID().Short(), folder) + is.resume(fset, runner) + } else { + l.Debugf("Not resuming index handler for device %v and folder %v as none is paused and there is no start info", r.conn.ID().Short(), folder.ID) + } +} + +func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error { + r.mut.Lock() + defer r.mut.Unlock() + is, isOk := r.indexHandlers[folder] + if !isOk { + l.Infof("%v for nonexistent or paused folder %q", op, folder) + return ErrFolderMissing + } + return is.receive(fs, update, op) +} + +// makeForgetUpdate takes an index update and constructs a download progress update +// causing to forget any progress for files which we've just been sent. +func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate { + updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files)) + for _, file := range files { + if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() { + continue + } + updates = append(updates, protocol.FileDownloadProgressUpdate{ + Name: file.Name, + Version: file.Version, + UpdateType: protocol.FileDownloadProgressUpdateTypeForget, + }) + } + return updates +} diff --git a/lib/model/indexsender.go b/lib/model/indexsender.go deleted file mode 100644 index c6dbcb237..000000000 --- a/lib/model/indexsender.go +++ /dev/null @@ -1,430 +0,0 @@ -// Copyright (C) 2020 The Syncthing Authors. -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this file, -// You can obtain one at https://mozilla.org/MPL/2.0/. - -package model - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/thejerf/suture/v4" - - "github.com/syncthing/syncthing/lib/config" - "github.com/syncthing/syncthing/lib/db" - "github.com/syncthing/syncthing/lib/events" - "github.com/syncthing/syncthing/lib/protocol" - "github.com/syncthing/syncthing/lib/svcutil" -) - -type indexSender struct { - conn protocol.Connection - folder string - folderIsReceiveEncrypted bool - fset *db.FileSet - prevSequence int64 - evLogger events.Logger - connClosed chan struct{} - done chan struct{} - token suture.ServiceToken - pauseChan chan struct{} - resumeChan chan *db.FileSet -} - -func (s *indexSender) Serve(ctx context.Context) (err error) { - l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence) - defer func() { - close(s.done) - err = svcutil.NoRestartErr(err) - l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err) - }() - - // We need to send one index, regardless of whether there is something to send or not - err = s.sendIndexTo(ctx) - - // Subscribe to LocalIndexUpdated (we have new information to send) and - // DeviceDisconnected (it might be us who disconnected, so we should - // exit). - sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected) - defer sub.Unsubscribe() - - paused := false - evChan := sub.C() - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - - for err == nil { - select { - case <-ctx.Done(): - return ctx.Err() - case <-s.connClosed: - return nil - default: - } - - // While we have sent a sequence at least equal to the one - // currently in the database, wait for the local index to update. The - // local index may update for other folders than the one we are - // sending for. - if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence { - select { - case <-ctx.Done(): - return ctx.Err() - case <-s.connClosed: - return nil - case <-evChan: - case <-ticker.C: - case <-s.pauseChan: - paused = true - case s.fset = <-s.resumeChan: - paused = false - } - - continue - } - - if !paused { - err = s.sendIndexTo(ctx) - } - - // Wait a short amount of time before entering the next loop. If there - // are continuous changes happening to the local index, this gives us - // time to batch them up a little. - time.Sleep(250 * time.Millisecond) - } - - return err -} - -func (s *indexSender) resume(fset *db.FileSet) { - select { - case <-s.done: - case s.resumeChan <- fset: - } -} - -func (s *indexSender) pause() { - select { - case <-s.done: - case s.pauseChan <- struct{}{}: - } -} - -// sendIndexTo sends file infos with a sequence number higher than prevSequence and -// returns the highest sent sequence number. -func (s *indexSender) sendIndexTo(ctx context.Context) error { - initial := s.prevSequence == 0 - batch := db.NewFileInfoBatch(nil) - batch.SetFlushFunc(func(fs []protocol.FileInfo) error { - l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size()) - if initial { - initial = false - return s.conn.Index(ctx, s.folder, fs) - } - return s.conn.IndexUpdate(ctx, s.folder, fs) - }) - - var err error - var f protocol.FileInfo - snap, err := s.fset.Snapshot() - if err != nil { - return svcutil.AsFatalErr(err, svcutil.ExitError) - } - defer snap.Release() - previousWasDelete := false - snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool { - // This is to make sure that renames (which is an add followed by a delete) land in the same batch. - // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that - // the batch ends with a non-delete, or that the last item in the batch is already a delete - if batch.Full() && (!fi.IsDeleted() || previousWasDelete) { - if err = batch.Flush(); err != nil { - return false - } - } - - if shouldDebug() { - if fi.SequenceNo() < s.prevSequence+1 { - panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1)) - } - } - - if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence { - l.Warnln("Non-increasing sequence detected: Checking and repairing the db...") - // Abort this round of index sending - the next one will pick - // up from the last successful one with the repeaired db. - defer func() { - if fixed, dbErr := s.fset.RepairSequence(); dbErr != nil { - l.Warnln("Failed repairing sequence entries:", dbErr) - panic("Failed repairing sequence entries") - } else { - s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence") - l.Infof("Repaired %v sequence entries in database", fixed) - } - }() - return false - } - - f = fi.(protocol.FileInfo) - - // If this is a folder receiving encrypted files only, we - // mustn't ever send locally changed file infos. Those aren't - // encrypted and thus would be a protocol error at the remote. - if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() { - return true - } - - f = prepareFileInfoForIndex(f) - - previousWasDelete = f.IsDeleted() - - batch.Append(f) - return true - }) - if err != nil { - return err - } - - err = batch.Flush() - - // True if there was nothing to be sent - if f.Sequence == 0 { - return err - } - - s.prevSequence = f.Sequence - return err -} - -func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo { - // Mark the file as invalid if any of the local bad stuff flags are set. - f.RawInvalid = f.IsInvalid() - // If the file is marked LocalReceive (i.e., changed locally on a - // receive only folder) we do not want it to ever become the - // globally best version, invalid or not. - if f.IsReceiveOnlyChanged() { - f.Version = protocol.Vector{} - } - // never sent externally - f.LocalFlags = 0 - f.VersionHash = nil - return f -} - -func (s *indexSender) String() string { - return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.conn.ID(), s.conn) -} - -type indexSenderRegistry struct { - deviceID protocol.DeviceID - sup *suture.Supervisor - evLogger events.Logger - conn protocol.Connection - closed chan struct{} - indexSenders map[string]*indexSender - startInfos map[string]*indexSenderStartInfo - mut sync.Mutex -} - -func newIndexSenderRegistry(conn protocol.Connection, closed chan struct{}, sup *suture.Supervisor, evLogger events.Logger) *indexSenderRegistry { - return &indexSenderRegistry{ - deviceID: conn.ID(), - conn: conn, - closed: closed, - sup: sup, - evLogger: evLogger, - indexSenders: make(map[string]*indexSender), - startInfos: make(map[string]*indexSenderStartInfo), - mut: sync.Mutex{}, - } -} - -// add starts an index sender for given folder. -// If an index sender is already running, it will be stopped first. -func (r *indexSenderRegistry) add(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) { - r.mut.Lock() - r.addLocked(folder, fset, startInfo) - l.Debugf("Started index sender for device %v and folder %v", r.deviceID.Short(), folder.ID) - r.mut.Unlock() -} - -func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) { - myIndexID := fset.IndexID(protocol.LocalDeviceID) - mySequence := fset.Sequence(protocol.LocalDeviceID) - var startSequence int64 - - // This is the other side's description of what it knows - // about us. Lets check to see if we can start sending index - // updates directly or need to send the index from start... - - if startInfo.local.IndexID == myIndexID { - // They say they've seen our index ID before, so we can - // send a delta update only. - - if startInfo.local.MaxSequence > mySequence { - // Safety check. They claim to have more or newer - // index data than we have - either we have lost - // index data, or reset the index without resetting - // the IndexID, or something else weird has - // happened. We send a full index to reset the - // situation. - l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", r.deviceID, folder.Description()) - startSequence = 0 - } else { - l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", r.deviceID, folder.Description(), startInfo.local.MaxSequence) - startSequence = startInfo.local.MaxSequence - } - } else if startInfo.local.IndexID != 0 { - // They say they've seen an index ID from us, but it's - // not the right one. Either they are confused or we - // must have reset our database since last talking to - // them. We'll start with a full index transfer. - l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", r.deviceID, folder.Description(), startInfo.local.IndexID, myIndexID) - startSequence = 0 - } else { - l.Debugf("Device %v folder %s has no index ID for us", r.deviceID, folder.Description()) - } - - // This is the other side's description of themselves. We - // check to see that it matches the IndexID we have on file, - // otherwise we drop our old index data and expect to get a - // completely new set. - - theirIndexID := fset.IndexID(r.deviceID) - if startInfo.remote.IndexID == 0 { - // They're not announcing an index ID. This means they - // do not support delta indexes and we should clear any - // information we have from them before accepting their - // index, which will presumably be a full index. - l.Debugf("Device %v folder %s does not announce an index ID", r.deviceID, folder.Description()) - fset.Drop(r.deviceID) - } else if startInfo.remote.IndexID != theirIndexID { - // The index ID we have on file is not what they're - // announcing. They must have reset their database and - // will probably send us a full index. We drop any - // information we have and remember this new index ID - // instead. - l.Infof("Device %v folder %s has a new index ID (%v)", r.deviceID, folder.Description(), startInfo.remote.IndexID) - fset.Drop(r.deviceID) - fset.SetIndexID(r.deviceID, startInfo.remote.IndexID) - } - - if is, ok := r.indexSenders[folder.ID]; ok { - r.sup.RemoveAndWait(is.token, 0) - delete(r.indexSenders, folder.ID) - } - delete(r.startInfos, folder.ID) - - is := &indexSender{ - conn: r.conn, - connClosed: r.closed, - done: make(chan struct{}), - folder: folder.ID, - folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted, - fset: fset, - prevSequence: startSequence, - evLogger: r.evLogger, - pauseChan: make(chan struct{}), - resumeChan: make(chan *db.FileSet), - } - is.token = r.sup.Add(is) - r.indexSenders[folder.ID] = is -} - -// addPending stores the given info to start an index sender once resume is called -// for this folder. -// If an index sender is already running, it will be stopped. -func (r *indexSenderRegistry) addPending(folder string, startInfo *indexSenderStartInfo) { - r.mut.Lock() - defer r.mut.Unlock() - - if is, ok := r.indexSenders[folder]; ok { - r.sup.RemoveAndWait(is.token, 0) - delete(r.indexSenders, folder) - l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.deviceID.Short(), folder) - } - r.startInfos[folder] = startInfo - l.Debugf("Pending index sender for device %v and folder %v", r.deviceID.Short(), folder) -} - -// remove stops a running index sender or removes one pending to be started. -// It is a noop if the folder isn't known. -func (r *indexSenderRegistry) remove(folder string) { - r.mut.Lock() - defer r.mut.Unlock() - - if is, ok := r.indexSenders[folder]; ok { - r.sup.RemoveAndWait(is.token, 0) - delete(r.indexSenders, folder) - } - delete(r.startInfos, folder) - l.Debugf("Removed index sender for device %v and folder %v", r.deviceID.Short(), folder) -} - -// removeAllExcept stops all running index senders and removes those pending to be started, -// except mentioned ones. -// It is a noop if the folder isn't known. -func (r *indexSenderRegistry) removeAllExcept(except map[string]struct{}) { - r.mut.Lock() - defer r.mut.Unlock() - - for folder, is := range r.indexSenders { - if _, ok := except[folder]; !ok { - r.sup.RemoveAndWait(is.token, 0) - delete(r.indexSenders, folder) - l.Debugf("Removed index sender for device %v and folder %v (removeAllExcept)", r.deviceID.Short(), folder) - } - } - for folder := range r.startInfos { - if _, ok := except[folder]; !ok { - delete(r.startInfos, folder) - l.Debugf("Removed pending index sender for device %v and folder %v (removeAllExcept)", r.deviceID.Short(), folder) - } - } -} - -// pause stops a running index sender. -// It is a noop if the folder isn't known or has not been started yet. -func (r *indexSenderRegistry) pause(folder string) { - r.mut.Lock() - defer r.mut.Unlock() - - if is, ok := r.indexSenders[folder]; ok { - is.pause() - l.Debugf("Paused index sender for device %v and folder %v", r.deviceID.Short(), folder) - } else { - l.Debugf("No index sender for device %v and folder %v to pause", r.deviceID.Short(), folder) - } -} - -// resume unpauses an already running index sender or starts it, if it was added -// while paused. -// It is a noop if the folder isn't known. -func (r *indexSenderRegistry) resume(folder config.FolderConfiguration, fset *db.FileSet) { - r.mut.Lock() - defer r.mut.Unlock() - - is, isOk := r.indexSenders[folder.ID] - if info, ok := r.startInfos[folder.ID]; ok { - if isOk { - r.sup.RemoveAndWait(is.token, 0) - delete(r.indexSenders, folder.ID) - l.Debugf("Removed index sender for device %v and folder %v in resume", r.deviceID.Short(), folder.ID) - } - r.addLocked(folder, fset, info) - delete(r.startInfos, folder.ID) - l.Debugf("Started index sender for device %v and folder %v in resume", r.deviceID.Short(), folder.ID) - } else if isOk { - is.resume(fset) - l.Debugf("Resume index sender for device %v and folder %v", r.deviceID.Short(), folder.ID) - } else { - l.Debugf("Not resuming index sender for device %v and folder %v as none is paused and there is no start info", r.deviceID.Short(), folder.ID) - } -} - -type indexSenderStartInfo struct { - local, remote protocol.Device -} diff --git a/lib/model/model.go b/lib/model/model.go index 8d996c789..afd4a2fdb 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -158,7 +158,7 @@ type model struct { helloMessages map[protocol.DeviceID]protocol.Hello deviceDownloads map[protocol.DeviceID]*deviceDownloadState remotePausedFolders map[protocol.DeviceID]map[string]struct{} // deviceID -> folders - indexSenders map[protocol.DeviceID]*indexSenderRegistry + indexHandlers map[protocol.DeviceID]*indexHandlerRegistry // for testing only foldersRunning int32 @@ -246,7 +246,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio helloMessages: make(map[protocol.DeviceID]protocol.Hello), deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState), remotePausedFolders: make(map[protocol.DeviceID]map[string]struct{}), - indexSenders: make(map[protocol.DeviceID]*indexSenderRegistry), + indexHandlers: make(map[protocol.DeviceID]*indexHandlerRegistry), } for devID := range cfg.Devices() { m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID) @@ -485,8 +485,8 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) { } m.cleanupFolderLocked(cfg) - for _, r := range m.indexSenders { - r.remove(cfg.ID) + for _, r := range m.indexHandlers { + r.Remove(cfg.ID) } m.fmut.Unlock() @@ -558,21 +558,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF // Care needs to be taken because we already hold fmut and the lock order // must be the same everywhere. As fmut is acquired first, this is fine. - // toDeviceIDs := to.DeviceIDs() m.pmut.RLock() - for _, id := range to.DeviceIDs() { - indexSenders, ok := m.indexSenders[id] - if !ok { - continue - } - // In case the folder was newly shared with us we already got a - // cluster config and wont necessarily get another soon - start - // sending indexes if connected. - if to.Paused { - indexSenders.pause(to.ID) - } else if !from.SharedWith(indexSenders.deviceID) || fsetNil || from.Paused { - indexSenders.resume(to, fset) - } + for _, indexRegistry := range m.indexHandlers { + indexRegistry.RegisterFolderState(to, fset, m.folderRunners[to.ID]) } m.pmut.RUnlock() @@ -601,18 +589,19 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool m.fmut.Lock() defer m.fmut.Unlock() + m.addAndStartFolderLocked(cfg, fset, cacheIgnoredFiles) + // Cluster configs might be received and processed before reaching this // point, i.e. before the folder is started. If that's the case, start // index senders here. + // Care needs to be taken because we already hold fmut and the lock order + // must be the same everywhere. As fmut is acquired first, this is fine. m.pmut.RLock() - for _, id := range cfg.DeviceIDs() { - if is, ok := m.indexSenders[id]; ok { - is.resume(cfg, fset) - } + for _, indexRegistry := range m.indexHandlers { + indexRegistry.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID]) } m.pmut.RUnlock() - m.addAndStartFolderLocked(cfg, fset, cacheIgnoredFiles) return nil } @@ -1151,46 +1140,23 @@ func (m *model) handleIndex(deviceID protocol.DeviceID, folder string, fs []prot return errors.Wrap(ErrFolderPaused, folder) } - m.fmut.RLock() - files, existing := m.folderFiles[folder] - runner, running := m.folderRunners[folder] - m.fmut.RUnlock() - - if !existing { - l.Infof("%v for nonexistent folder %q", op, folder) - return errors.Wrap(ErrFolderMissing, folder) - } - - if running { - defer runner.SchedulePull() - } - m.pmut.RLock() - downloads := m.deviceDownloads[deviceID] + indexHandler, ok := m.indexHandlers[deviceID] m.pmut.RUnlock() - downloads.Update(folder, makeForgetUpdate(fs)) - - if !update { - files.Drop(deviceID) + if !ok { + // This should be impossible, as an index handler always exists for an + // open connection, and this method can't be called on a closed + // connection + m.evLogger.Log(events.Failure, "index sender does not exist for connection on which indexes were received") + l.Debugf("%v for folder (ID %q) sent from device %q: missing index handler", op, folder, deviceID) + return errors.Wrap(errors.New("index handler missing"), folder) } - for i := range fs { - // The local attributes should never be transmitted over the wire. - // Make sure they look like they weren't. - fs[i].LocalFlags = 0 - fs[i].VersionHash = nil - } - files.Update(deviceID, fs) - seq := files.Sequence(deviceID) - m.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{ - "device": deviceID.String(), - "folder": folder, - "items": len(fs), - "sequence": seq, - "version": seq, // legacy for sequence - }) + return indexHandler.ReceiveIndex(folder, fs, update, op) +} - return nil +type clusterConfigDeviceInfo struct { + local, remote protocol.Device } func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterConfig) error { @@ -1199,8 +1165,10 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon // Also, collect a list of folders we do share, and if he's interested in // temporary indexes, subscribe the connection. + l.Debugf("Handling ClusterConfig from %v", deviceID.Short()) + m.pmut.RLock() - indexSenderRegistry, ok := m.indexSenders[deviceID] + indexHandlerRegistry, ok := m.indexHandlers[deviceID] m.pmut.RUnlock() if !ok { panic("bug: ClusterConfig called on closed or nonexistent connection") @@ -1214,9 +1182,9 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon // Assemble the device information from the connected device about // themselves and us for all folders. - ccDeviceInfos := make(map[string]*indexSenderStartInfo, len(cm.Folders)) + ccDeviceInfos := make(map[string]*clusterConfigDeviceInfo, len(cm.Folders)) for _, folder := range cm.Folders { - info := &indexSenderStartInfo{} + info := &clusterConfigDeviceInfo{} for _, dev := range folder.Devices { if dev.ID == m.id { info.local = dev @@ -1269,7 +1237,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon w.Wait() } - tempIndexFolders, paused, err := m.ccHandleFolders(cm.Folders, deviceCfg, ccDeviceInfos, indexSenderRegistry) + tempIndexFolders, paused, err := m.ccHandleFolders(cm.Folders, deviceCfg, ccDeviceInfos, indexHandlerRegistry) if err != nil { return err } @@ -1310,7 +1278,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon return nil } -func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.DeviceConfiguration, ccDeviceInfos map[string]*indexSenderStartInfo, indexSenders *indexSenderRegistry) ([]string, map[string]struct{}, error) { +func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.DeviceConfiguration, ccDeviceInfos map[string]*clusterConfigDeviceInfo, indexHandlers *indexHandlerRegistry) ([]string, map[string]struct{}, error) { var folderDevice config.FolderDeviceConfiguration tempIndexFolders := make([]string, 0, len(folders)) paused := make(map[string]struct{}, len(folders)) @@ -1330,7 +1298,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi folderDevice, ok = cfg.Device(deviceID) } if !ok { - indexSenders.remove(folder.ID) + indexHandlers.Remove(folder.ID) if deviceCfg.IgnoredFolder(folder.ID) { l.Infof("Ignoring folder %s from device %s since we are configured to", folder.Description(), deviceID) continue @@ -1341,7 +1309,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi if err := m.db.AddOrUpdatePendingFolder(folder.ID, of, deviceID); err != nil { l.Warnf("Failed to persist pending folder entry to database: %v", err) } - indexSenders.addPending(folder.ID, ccDeviceInfos[folder.ID]) + indexHandlers.AddIndexInfo(folder.ID, ccDeviceInfos[folder.ID]) updatedPending = append(updatedPending, updatedPendingFolder{ FolderID: folder.ID, FolderLabel: folder.Label, @@ -1359,13 +1327,13 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi } if folder.Paused { - indexSenders.remove(folder.ID) + indexHandlers.Remove(folder.ID) paused[cfg.ID] = struct{}{} continue } if cfg.Paused { - indexSenders.addPending(folder.ID, ccDeviceInfos[folder.ID]) + indexHandlers.AddIndexInfo(folder.ID, ccDeviceInfos[folder.ID]) continue } @@ -1403,29 +1371,10 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi tempIndexFolders = append(tempIndexFolders, folder.ID) } - m.fmut.RLock() - fs, ok := m.folderFiles[folder.ID] - m.fmut.RUnlock() - if !ok { - // Shouldn't happen because !cfg.Paused, but might happen - // if the folder is about to be unpaused, but not yet. - l.Debugln("ccH: no fset", folder.ID) - indexSenders.addPending(folder.ID, ccDeviceInfos[folder.ID]) - continue - } - - indexSenders.add(cfg, fs, ccDeviceInfos[folder.ID]) - - // We might already have files that we need to pull so let the - // folder runner know that it should recheck the index data. - m.fmut.RLock() - if runner := m.folderRunners[folder.ID]; runner != nil { - defer runner.SchedulePull() - } - m.fmut.RUnlock() + indexHandlers.AddIndexInfo(folder.ID, ccDeviceInfos[folder.ID]) } - indexSenders.removeAllExcept(seenFolders) + indexHandlers.RemoveAllExcept(seenFolders) for folder := range expiredPending { m.db.RemovePendingFolderForDevice(folder, deviceID) } @@ -1446,7 +1395,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi return tempIndexFolders, paused, nil } -func (m *model) ccCheckEncryption(fcfg config.FolderConfiguration, folderDevice config.FolderDeviceConfiguration, ccDeviceInfos *indexSenderStartInfo, deviceUntrusted bool) error { +func (m *model) ccCheckEncryption(fcfg config.FolderConfiguration, folderDevice config.FolderDeviceConfiguration, ccDeviceInfos *clusterConfigDeviceInfo, deviceUntrusted bool) error { hasTokenRemote := len(ccDeviceInfos.remote.EncryptionPasswordToken) > 0 hasTokenLocal := len(ccDeviceInfos.local.EncryptionPasswordToken) > 0 isEncryptedRemote := folderDevice.EncryptionPassword != "" @@ -1688,7 +1637,7 @@ func (m *model) handleDeintroductions(introducerCfg config.DeviceConfiguration, // handleAutoAccepts handles adding and sharing folders for devices that have // AutoAcceptFolders set to true. -func (m *model) handleAutoAccepts(deviceID protocol.DeviceID, folder protocol.Folder, ccDeviceInfos *indexSenderStartInfo, cfg config.FolderConfiguration, haveCfg bool, defaultPath string) (config.FolderConfiguration, bool) { +func (m *model) handleAutoAccepts(deviceID protocol.DeviceID, folder protocol.Folder, ccDeviceInfos *clusterConfigDeviceInfo, cfg config.FolderConfiguration, haveCfg bool, defaultPath string) (config.FolderConfiguration, bool) { if !haveCfg { defaultPathFs := fs.NewFilesystem(fs.FilesystemTypeBasic, defaultPath) pathAlternatives := []string{ @@ -1783,7 +1732,7 @@ func (m *model) Closed(device protocol.DeviceID, err error) { delete(m.remotePausedFolders, device) closed := m.closed[device] delete(m.closed, device) - delete(m.indexSenders, device) + delete(m.indexHandlers, device) m.pmut.Unlock() m.progressEmitter.temporaryIndexUnsubscribe(conn) @@ -2244,6 +2193,9 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) { return } + // The slightly unusual locking sequence here is because we must acquire + // fmut before pmut. (The locks can be *released* in any order.) + m.fmut.RLock() m.pmut.Lock() if oldConn, ok := m.conn[deviceID]; ok { l.Infoln("Replacing old connection", oldConn, "with", conn, "for", deviceID) @@ -2253,9 +2205,12 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) { // actual close without holding pmut as the connection will call // back into Closed() for the cleanup. closed := m.closed[deviceID] + m.fmut.RUnlock() m.pmut.Unlock() oldConn.Close(errReplacingConnection) <-closed + // Again, lock fmut before pmut. + m.fmut.RLock() m.pmut.Lock() } @@ -2263,7 +2218,12 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) { closed := make(chan struct{}) m.closed[deviceID] = closed m.deviceDownloads[deviceID] = newDeviceDownloadState() - m.indexSenders[deviceID] = newIndexSenderRegistry(conn, closed, m.Supervisor, m.evLogger) + indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], closed, m.Supervisor, m.evLogger) + for id, fcfg := range m.folderCfgs { + indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id]) + } + m.indexHandlers[deviceID] = indexRegistry + m.fmut.RUnlock() // 0: default, <0: no limiting switch { case device.MaxRequestKiB > 0: @@ -3119,23 +3079,6 @@ func readOffsetIntoBuf(fs fs.Filesystem, file string, offset int64, buf []byte) return n, err } -// makeForgetUpdate takes an index update and constructs a download progress update -// causing to forget any progress for files which we've just been sent. -func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate { - updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files)) - for _, file := range files { - if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() { - continue - } - updates = append(updates, protocol.FileDownloadProgressUpdate{ - Name: file.Name, - Version: file.Version, - UpdateType: protocol.FileDownloadProgressUpdateTypeForget, - }) - } - return updates -} - // folderDeviceSet is a set of (folder, deviceID) pairs type folderDeviceSet map[string]map[protocol.DeviceID]struct{} diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 7d5444148..04337e417 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -220,15 +220,16 @@ func BenchmarkIndex_100(b *testing.B) { } func benchmarkIndex(b *testing.B, nfiles int) { - m := setupModel(b, defaultCfgWrapper) - defer cleanupModel(m) + m, _, fcfg, wcfgCancel := setupModelWithConnection(b) + defer wcfgCancel() + defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI()) files := genFiles(nfiles) - must(b, m.Index(device1, "default", files)) + must(b, m.Index(device1, fcfg.ID, files)) b.ResetTimer() for i := 0; i < b.N; i++ { - must(b, m.Index(device1, "default", files)) + must(b, m.Index(device1, fcfg.ID, files)) } b.ReportAllocs() } @@ -246,17 +247,18 @@ func BenchmarkIndexUpdate_10000_1(b *testing.B) { } func benchmarkIndexUpdate(b *testing.B, nfiles, nufiles int) { - m := setupModel(b, defaultCfgWrapper) - defer cleanupModel(m) + m, _, fcfg, wcfgCancel := setupModelWithConnection(b) + defer wcfgCancel() + defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI()) files := genFiles(nfiles) ufiles := genFiles(nufiles) - must(b, m.Index(device1, "default", files)) + must(b, m.Index(device1, fcfg.ID, files)) b.ResetTimer() for i := 0; i < b.N; i++ { - must(b, m.IndexUpdate(device1, "default", ufiles)) + must(b, m.IndexUpdate(device1, fcfg.ID, ufiles)) } b.ReportAllocs() } @@ -1695,9 +1697,8 @@ func TestRWScanRecovery(t *testing.T) { } func TestGlobalDirectoryTree(t *testing.T) { - w, fcfg, wCancel := tmpDefaultWrapper() + m, _, fcfg, wCancel := setupModelWithConnection(t) defer wCancel() - m := setupModel(t, w) defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI()) b := func(isfile bool, path ...string) protocol.FileInfo { @@ -1999,18 +2000,18 @@ func BenchmarkTree_100_10(b *testing.B) { } func benchmarkTree(b *testing.B, n1, n2 int) { - m := newModel(b, defaultCfgWrapper, myID, "syncthing", "dev", nil) - m.ServeBackground() - defer cleanupModel(m) + m, _, fcfg, wcfgCancel := setupModelWithConnection(b) + defer wcfgCancel() + defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI()) - m.ScanFolder("default") + m.ScanFolder(fcfg.ID) files := genDeepFiles(n1, n2) - must(b, m.Index(device1, "default", files)) + must(b, m.Index(device1, fcfg.ID, files)) b.ResetTimer() for i := 0; i < b.N; i++ { - m.GlobalDirectoryTree("default", "", -1, false) + m.GlobalDirectoryTree(fcfg.ID, "", -1, false) } b.ReportAllocs() } @@ -2627,43 +2628,43 @@ func TestCustomMarkerName(t *testing.T) { } func TestRemoveDirWithContent(t *testing.T) { - defer func() { - defaultFs.RemoveAll("dirwith") - }() + m, _, fcfg, wcfgCancel := setupModelWithConnection(t) + defer wcfgCancel() + tfs := fcfg.Filesystem() + defer cleanupModelAndRemoveDir(m, tfs.URI()) - defaultFs.MkdirAll("dirwith", 0755) + tfs.MkdirAll("dirwith", 0755) content := filepath.Join("dirwith", "content") - fd, err := defaultFs.Create(content) + fd, err := tfs.Create(content) must(t, err) fd.Close() - m := setupModel(t, defaultCfgWrapper) - defer cleanupModel(m) + must(t, m.ScanFolder(fcfg.ID)) - dir, ok := m.testCurrentFolderFile("default", "dirwith") + dir, ok := m.testCurrentFolderFile(fcfg.ID, "dirwith") if !ok { t.Fatalf("Can't get dir \"dirwith\" after initial scan") } dir.Deleted = true dir.Version = dir.Version.Update(device1.Short()).Update(device1.Short()) - file, ok := m.testCurrentFolderFile("default", content) + file, ok := m.testCurrentFolderFile(fcfg.ID, content) if !ok { t.Fatalf("Can't get file \"%v\" after initial scan", content) } file.Deleted = true file.Version = file.Version.Update(device1.Short()).Update(device1.Short()) - must(t, m.IndexUpdate(device1, "default", []protocol.FileInfo{dir, file})) + must(t, m.IndexUpdate(device1, fcfg.ID, []protocol.FileInfo{dir, file})) // Is there something we could trigger on instead of just waiting? timeout := time.NewTimer(5 * time.Second) for { - dir, ok := m.testCurrentFolderFile("default", "dirwith") + dir, ok := m.testCurrentFolderFile(fcfg.ID, "dirwith") if !ok { t.Fatalf("Can't get dir \"dirwith\" after index update") } - file, ok := m.testCurrentFolderFile("default", content) + file, ok := m.testCurrentFolderFile(fcfg.ID, content) if !ok { t.Fatalf("Can't get file \"%v\" after index update", content) } @@ -3766,14 +3767,14 @@ func TestAddFolderCompletion(t *testing.T) { } func TestScanDeletedROChangedOnSR(t *testing.T) { - w, fcfg, wCancel := tmpDefaultWrapper() - defer wCancel() - fcfg.Type = config.FolderTypeReceiveOnly - setFolder(t, w, fcfg) - m := setupModel(t, w) - defer cleanupModel(m) - name := "foo" + m, _, fcfg, wCancel := setupModelWithConnection(t) ffs := fcfg.Filesystem() + defer wCancel() + defer cleanupModelAndRemoveDir(m, ffs.URI()) + fcfg.Type = config.FolderTypeReceiveOnly + setFolder(t, m.cfg, fcfg) + + name := "foo" must(t, writeFile(ffs, name, []byte(name), 0644)) m.ScanFolders() @@ -3794,7 +3795,7 @@ func TestScanDeletedROChangedOnSR(t *testing.T) { } fcfg.Type = config.FolderTypeSendReceive - setFolder(t, w, fcfg) + setFolder(t, m.cfg, fcfg) m.ScanFolders() if receiveOnlyChangedSize(t, m, fcfg.ID).Deleted != 0 { @@ -3889,6 +3890,8 @@ func TestIssue6961(t *testing.T) { } m.ServeBackground() defer cleanupModelAndRemoveDir(m, tfs.URI()) + addFakeConn(m, device1, fcfg.ID) + addFakeConn(m, device2, fcfg.ID) m.ScanFolders() name := "foo" @@ -3937,9 +3940,8 @@ func TestIssue6961(t *testing.T) { } func TestCompletionEmptyGlobal(t *testing.T) { - wcfg, fcfg, wcfgCancel := tmpDefaultWrapper() + m, _, fcfg, wcfgCancel := setupModelWithConnection(t) defer wcfgCancel() - m := setupModel(t, wcfg) defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI()) files := []protocol.FileInfo{{Name: "foo", Version: protocol.Vector{}.Update(myID.Short()), Sequence: 1}} m.fmut.Lock() @@ -3960,6 +3962,8 @@ func TestNeedMetaAfterIndexReset(t *testing.T) { addDevice2(t, w, fcfg) m := setupModel(t, w) defer cleanupModelAndRemoveDir(m, fcfg.Path) + addFakeConn(m, device1, fcfg.ID) + addFakeConn(m, device2, fcfg.ID) var seq int64 = 1 files := []protocol.FileInfo{{Name: "foo", Size: 10, Version: protocol.Vector{}.Update(device1.Short()), Sequence: seq}} @@ -4097,7 +4101,7 @@ func TestCcCheckEncryption(t *testing.T) { dcfg.EncryptionPassword = pw } - deviceInfos := &indexSenderStartInfo{ + deviceInfos := &clusterConfigDeviceInfo{ remote: protocol.Device{ID: device1, EncryptionPasswordToken: tc.tokenRemote}, local: protocol.Device{ID: myID, EncryptionPasswordToken: tc.tokenLocal}, } diff --git a/lib/model/requests_test.go b/lib/model/requests_test.go index da3e8d036..a0c491d71 100644 --- a/lib/model/requests_test.go +++ b/lib/model/requests_test.go @@ -1446,6 +1446,7 @@ func TestRequestGlobalInvalidToValid(t *testing.T) { }) must(t, err) waiter.Wait() + addFakeConn(m, device2, fcfg.ID) tfs := fcfg.Filesystem() defer cleanupModelAndRemoveDir(m, tfs.URI())