mirror of
https://github.com/octoleo/syncthing.git
synced 2025-02-02 11:58:28 +00:00
parent
a17a8cd48b
commit
5c91723ef2
403
lib/model/indexsender.go
Normal file
403
lib/model/indexsender.go
Normal file
@ -0,0 +1,403 @@
|
||||
// Copyright (C) 2020 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/thejerf/suture"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/config"
|
||||
"github.com/syncthing/syncthing/lib/db"
|
||||
"github.com/syncthing/syncthing/lib/events"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syncthing/syncthing/lib/util"
|
||||
)
|
||||
|
||||
type indexSender struct {
|
||||
suture.Service
|
||||
conn protocol.Connection
|
||||
folder string
|
||||
dev string
|
||||
fset *db.FileSet
|
||||
prevSequence int64
|
||||
evLogger events.Logger
|
||||
connClosed chan struct{}
|
||||
token suture.ServiceToken
|
||||
pauseChan chan struct{}
|
||||
resumeChan chan *db.FileSet
|
||||
}
|
||||
|
||||
func (s *indexSender) serve(ctx context.Context) {
|
||||
var err error
|
||||
|
||||
l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence)
|
||||
defer l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.dev, s.conn, err)
|
||||
|
||||
// We need to send one index, regardless of whether there is something to send or not
|
||||
err = s.sendIndexTo(ctx)
|
||||
|
||||
// Subscribe to LocalIndexUpdated (we have new information to send) and
|
||||
// DeviceDisconnected (it might be us who disconnected, so we should
|
||||
// exit).
|
||||
sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
paused := false
|
||||
evChan := sub.C()
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for err == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.connClosed:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// While we have sent a sequence at least equal to the one
|
||||
// currently in the database, wait for the local index to update. The
|
||||
// local index may update for other folders than the one we are
|
||||
// sending for.
|
||||
if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.connClosed:
|
||||
return
|
||||
case <-evChan:
|
||||
case <-ticker.C:
|
||||
case <-s.pauseChan:
|
||||
paused = true
|
||||
case s.fset = <-s.resumeChan:
|
||||
paused = false
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if !paused {
|
||||
err = s.sendIndexTo(ctx)
|
||||
}
|
||||
|
||||
// Wait a short amount of time before entering the next loop. If there
|
||||
// are continuous changes happening to the local index, this gives us
|
||||
// time to batch them up a little.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// Complete implements the suture.IsCompletable interface. When Serve terminates
|
||||
// before Stop is called, the supervisor will check for this method and if it
|
||||
// returns true removes the service instead of restarting it. Here it always
|
||||
// returns true, as indexSender only terminates when a connection is
|
||||
// closed/has failed, in which case retrying doesn't help.
|
||||
func (s *indexSender) Complete() bool { return true }
|
||||
|
||||
func (s *indexSender) resume(fset *db.FileSet) {
|
||||
select {
|
||||
case <-s.connClosed:
|
||||
case s.resumeChan <- fset:
|
||||
}
|
||||
}
|
||||
|
||||
func (s *indexSender) pause() {
|
||||
select {
|
||||
case <-s.connClosed:
|
||||
case s.pauseChan <- struct{}{}:
|
||||
}
|
||||
}
|
||||
|
||||
// sendIndexTo sends file infos with a sequence number higher than prevSequence and
|
||||
// returns the highest sent sequence number.
|
||||
func (s *indexSender) sendIndexTo(ctx context.Context) error {
|
||||
initial := s.prevSequence == 0
|
||||
batch := newFileInfoBatch(nil)
|
||||
batch.flushFn = func(fs []protocol.FileInfo) error {
|
||||
l.Debugf("%v: Sending %d files (<%d bytes)", s, len(batch.infos), batch.size)
|
||||
if initial {
|
||||
initial = false
|
||||
return s.conn.Index(ctx, s.folder, fs)
|
||||
}
|
||||
return s.conn.IndexUpdate(ctx, s.folder, fs)
|
||||
}
|
||||
|
||||
var err error
|
||||
var f protocol.FileInfo
|
||||
snap := s.fset.Snapshot()
|
||||
defer snap.Release()
|
||||
previousWasDelete := false
|
||||
snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool {
|
||||
// This is to make sure that renames (which is an add followed by a delete) land in the same batch.
|
||||
// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
|
||||
// the batch ends with a non-delete, or that the last item in the batch is already a delete
|
||||
if batch.full() && (!fi.IsDeleted() || previousWasDelete) {
|
||||
if err = batch.flush(); err != nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if shouldDebug() {
|
||||
if fi.SequenceNo() < s.prevSequence+1 {
|
||||
panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
|
||||
}
|
||||
}
|
||||
|
||||
if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
|
||||
l.Warnln("Non-increasing sequence detected: Checking and repairing the db...")
|
||||
// Abort this round of index sending - the next one will pick
|
||||
// up from the last successful one with the repeaired db.
|
||||
defer func() {
|
||||
if fixed, dbErr := s.fset.RepairSequence(); dbErr != nil {
|
||||
l.Warnln("Failed repairing sequence entries:", dbErr)
|
||||
panic("Failed repairing sequence entries")
|
||||
} else {
|
||||
s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
|
||||
l.Infof("Repaired %v sequence entries in database", fixed)
|
||||
}
|
||||
}()
|
||||
return false
|
||||
}
|
||||
|
||||
f = fi.(protocol.FileInfo)
|
||||
|
||||
// Mark the file as invalid if any of the local bad stuff flags are set.
|
||||
f.RawInvalid = f.IsInvalid()
|
||||
// If the file is marked LocalReceive (i.e., changed locally on a
|
||||
// receive only folder) we do not want it to ever become the
|
||||
// globally best version, invalid or not.
|
||||
if f.IsReceiveOnlyChanged() {
|
||||
f.Version = protocol.Vector{}
|
||||
}
|
||||
|
||||
// never sent externally
|
||||
f.LocalFlags = 0
|
||||
f.VersionHash = nil
|
||||
|
||||
previousWasDelete = f.IsDeleted()
|
||||
|
||||
batch.append(f)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = batch.flush()
|
||||
|
||||
// True if there was nothing to be sent
|
||||
if f.Sequence == 0 {
|
||||
return err
|
||||
}
|
||||
|
||||
s.prevSequence = f.Sequence
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *indexSender) String() string {
|
||||
return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.dev, s.conn)
|
||||
}
|
||||
|
||||
type indexSenderRegistry struct {
|
||||
deviceID protocol.DeviceID
|
||||
sup *suture.Supervisor
|
||||
evLogger events.Logger
|
||||
conn protocol.Connection
|
||||
closed chan struct{}
|
||||
indexSenders map[string]*indexSender
|
||||
startInfos map[string]*indexSenderStartInfo
|
||||
mut sync.Mutex
|
||||
}
|
||||
|
||||
func newIndexSenderRegistry(conn protocol.Connection, closed chan struct{}, sup *suture.Supervisor, evLogger events.Logger) *indexSenderRegistry {
|
||||
return &indexSenderRegistry{
|
||||
deviceID: conn.ID(),
|
||||
conn: conn,
|
||||
closed: closed,
|
||||
sup: sup,
|
||||
evLogger: evLogger,
|
||||
indexSenders: make(map[string]*indexSender),
|
||||
startInfos: make(map[string]*indexSenderStartInfo),
|
||||
mut: sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// add starts an index sender for given folder.
|
||||
// If an index sender is already running, it will be stopped first.
|
||||
func (r *indexSenderRegistry) add(folder config.FolderConfiguration, fset *db.FileSet, local, remote protocol.Device) {
|
||||
r.mut.Lock()
|
||||
r.addLocked(folder, fset, remote, local)
|
||||
r.mut.Unlock()
|
||||
}
|
||||
|
||||
func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, local, remote protocol.Device) {
|
||||
if is, ok := r.indexSenders[folder.ID]; ok {
|
||||
r.sup.RemoveAndWait(is.token, 0)
|
||||
delete(r.indexSenders, folder.ID)
|
||||
}
|
||||
if _, ok := r.startInfos[folder.ID]; ok {
|
||||
delete(r.startInfos, folder.ID)
|
||||
}
|
||||
|
||||
myIndexID := fset.IndexID(protocol.LocalDeviceID)
|
||||
mySequence := fset.Sequence(protocol.LocalDeviceID)
|
||||
var startSequence int64
|
||||
|
||||
// This is the other side's description of what it knows
|
||||
// about us. Lets check to see if we can start sending index
|
||||
// updates directly or need to send the index from start...
|
||||
|
||||
if local.IndexID == myIndexID {
|
||||
// They say they've seen our index ID before, so we can
|
||||
// send a delta update only.
|
||||
|
||||
if local.MaxSequence > mySequence {
|
||||
// Safety check. They claim to have more or newer
|
||||
// index data than we have - either we have lost
|
||||
// index data, or reset the index without resetting
|
||||
// the IndexID, or something else weird has
|
||||
// happened. We send a full index to reset the
|
||||
// situation.
|
||||
l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", r.deviceID, folder.Description())
|
||||
startSequence = 0
|
||||
} else {
|
||||
l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", r.deviceID, folder.Description(), local.MaxSequence)
|
||||
startSequence = local.MaxSequence
|
||||
}
|
||||
} else if local.IndexID != 0 {
|
||||
// They say they've seen an index ID from us, but it's
|
||||
// not the right one. Either they are confused or we
|
||||
// must have reset our database since last talking to
|
||||
// them. We'll start with a full index transfer.
|
||||
l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", r.deviceID, folder.Description(), local.IndexID, myIndexID)
|
||||
startSequence = 0
|
||||
}
|
||||
|
||||
// This is the other side's description of themselves. We
|
||||
// check to see that it matches the IndexID we have on file,
|
||||
// otherwise we drop our old index data and expect to get a
|
||||
// completely new set.
|
||||
|
||||
theirIndexID := fset.IndexID(r.deviceID)
|
||||
if remote.IndexID == 0 {
|
||||
// They're not announcing an index ID. This means they
|
||||
// do not support delta indexes and we should clear any
|
||||
// information we have from them before accepting their
|
||||
// index, which will presumably be a full index.
|
||||
fset.Drop(r.deviceID)
|
||||
} else if remote.IndexID != theirIndexID {
|
||||
// The index ID we have on file is not what they're
|
||||
// announcing. They must have reset their database and
|
||||
// will probably send us a full index. We drop any
|
||||
// information we have and remember this new index ID
|
||||
// instead.
|
||||
l.Infof("Device %v folder %s has a new index ID (%v)", r.deviceID, folder.Description(), remote.IndexID)
|
||||
fset.Drop(r.deviceID)
|
||||
fset.SetIndexID(r.deviceID, remote.IndexID)
|
||||
}
|
||||
|
||||
is := &indexSender{
|
||||
conn: r.conn,
|
||||
connClosed: r.closed,
|
||||
folder: folder.ID,
|
||||
fset: fset,
|
||||
prevSequence: startSequence,
|
||||
evLogger: r.evLogger,
|
||||
pauseChan: make(chan struct{}),
|
||||
resumeChan: make(chan *db.FileSet),
|
||||
}
|
||||
is.Service = util.AsService(is.serve, is.String())
|
||||
is.token = r.sup.Add(is)
|
||||
r.indexSenders[folder.ID] = is
|
||||
}
|
||||
|
||||
// addPaused stores the given info to start an index sender once resume is called
|
||||
// for this folder.
|
||||
// If an index sender is already running, it will be stopped.
|
||||
func (r *indexSenderRegistry) addPaused(folder config.FolderConfiguration, local, remote protocol.Device) {
|
||||
r.mut.Lock()
|
||||
defer r.mut.Unlock()
|
||||
|
||||
if is, ok := r.indexSenders[folder.ID]; ok {
|
||||
r.sup.RemoveAndWait(is.token, 0)
|
||||
delete(r.indexSenders, folder.ID)
|
||||
}
|
||||
r.startInfos[folder.ID] = &indexSenderStartInfo{local, remote}
|
||||
}
|
||||
|
||||
// remove stops a running index sender or removes one pending to be started.
|
||||
// It is a noop if the folder isn't known.
|
||||
func (r *indexSenderRegistry) remove(folder string) {
|
||||
r.mut.Lock()
|
||||
defer r.mut.Unlock()
|
||||
|
||||
if is, ok := r.indexSenders[folder]; ok {
|
||||
r.sup.RemoveAndWait(is.token, 0)
|
||||
delete(r.indexSenders, folder)
|
||||
}
|
||||
delete(r.startInfos, folder)
|
||||
}
|
||||
|
||||
// removeAllExcept stops all running index senders and removes those pending to be started,
|
||||
// except mentioned ones.
|
||||
// It is a noop if the folder isn't known.
|
||||
func (r *indexSenderRegistry) removeAllExcept(except map[string]struct{}) {
|
||||
r.mut.Lock()
|
||||
defer r.mut.Unlock()
|
||||
|
||||
for folder, is := range r.indexSenders {
|
||||
if _, ok := except[folder]; !ok {
|
||||
r.sup.RemoveAndWait(is.token, 0)
|
||||
delete(r.indexSenders, folder)
|
||||
}
|
||||
}
|
||||
for folder := range r.indexSenders {
|
||||
if _, ok := except[folder]; !ok {
|
||||
delete(r.startInfos, folder)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pause stops a running index sender.
|
||||
// It is a noop if the folder isn't known or has not been started yet.
|
||||
func (r *indexSenderRegistry) pause(folder string) {
|
||||
r.mut.Lock()
|
||||
defer r.mut.Unlock()
|
||||
|
||||
if is, ok := r.indexSenders[folder]; ok {
|
||||
is.pause()
|
||||
}
|
||||
}
|
||||
|
||||
// resume unpauses an already running index sender or starts it, if it was added
|
||||
// while paused.
|
||||
// It is a noop if the folder isn't known.
|
||||
func (r *indexSenderRegistry) resume(folder config.FolderConfiguration, fset *db.FileSet) {
|
||||
r.mut.Lock()
|
||||
defer r.mut.Unlock()
|
||||
|
||||
is, isOk := r.indexSenders[folder.ID]
|
||||
if info, ok := r.startInfos[folder.ID]; ok {
|
||||
if isOk {
|
||||
r.sup.RemoveAndWait(is.token, 0)
|
||||
delete(r.indexSenders, folder.ID)
|
||||
}
|
||||
r.addLocked(folder, fset, info.local, info.remote)
|
||||
delete(r.startInfos, folder.ID)
|
||||
} else if isOk {
|
||||
is.resume(fset)
|
||||
}
|
||||
}
|
||||
|
||||
type indexSenderStartInfo struct {
|
||||
local, remote protocol.Device
|
||||
}
|
@ -35,7 +35,6 @@ import (
|
||||
"github.com/syncthing/syncthing/lib/stats"
|
||||
"github.com/syncthing/syncthing/lib/sync"
|
||||
"github.com/syncthing/syncthing/lib/ur/contract"
|
||||
"github.com/syncthing/syncthing/lib/util"
|
||||
"github.com/syncthing/syncthing/lib/versioner"
|
||||
)
|
||||
|
||||
@ -149,8 +148,8 @@ type model struct {
|
||||
closed map[protocol.DeviceID]chan struct{}
|
||||
helloMessages map[protocol.DeviceID]protocol.Hello
|
||||
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
|
||||
remotePausedFolders map[protocol.DeviceID][]string // deviceID -> folders
|
||||
indexSenderTokens map[protocol.DeviceID][]suture.ServiceToken
|
||||
remotePausedFolders map[protocol.DeviceID]map[string]struct{} // deviceID -> folders
|
||||
indexSenders map[protocol.DeviceID]*indexSenderRegistry
|
||||
|
||||
foldersRunning int32 // for testing only
|
||||
}
|
||||
@ -175,6 +174,8 @@ var (
|
||||
errIgnoredFolderRemoved = errors.New("folder no longer ignored")
|
||||
errReplacingConnection = errors.New("replacing connection")
|
||||
errStopped = errors.New("Syncthing is being stopped")
|
||||
errMissingRemoteInClusterConfig = errors.New("remote device missing in cluster config")
|
||||
errMissingLocalInClusterConfig = errors.New("local device missing in cluster config")
|
||||
)
|
||||
|
||||
// NewModel creates and starts a new model. The model starts in read-only mode,
|
||||
@ -222,8 +223,8 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
|
||||
closed: make(map[protocol.DeviceID]chan struct{}),
|
||||
helloMessages: make(map[protocol.DeviceID]protocol.Hello),
|
||||
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
|
||||
remotePausedFolders: make(map[protocol.DeviceID][]string),
|
||||
indexSenderTokens: make(map[protocol.DeviceID][]suture.ServiceToken),
|
||||
remotePausedFolders: make(map[protocol.DeviceID]map[string]struct{}),
|
||||
indexSenders: make(map[protocol.DeviceID]*indexSenderRegistry),
|
||||
}
|
||||
for devID := range cfg.Devices() {
|
||||
m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID.String())
|
||||
@ -405,7 +406,10 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
|
||||
m.RemoveAndWait(token, 0)
|
||||
}
|
||||
|
||||
// We need to hold both fmut and pmut and must acquire locks in the same
|
||||
// order always. (The locks can be *released* in any order.)
|
||||
m.fmut.Lock()
|
||||
m.pmut.RLock()
|
||||
|
||||
isPathUnique := true
|
||||
for folderID, folderCfg := range m.folderCfgs {
|
||||
@ -420,8 +424,12 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
|
||||
}
|
||||
|
||||
m.cleanupFolderLocked(cfg)
|
||||
for _, r := range m.indexSenders {
|
||||
r.remove(cfg.ID)
|
||||
}
|
||||
|
||||
m.fmut.Unlock()
|
||||
m.pmut.RUnlock()
|
||||
|
||||
// Remove it from the database
|
||||
db.DropFolder(m.db, cfg.ID)
|
||||
@ -472,14 +480,32 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
|
||||
fset := m.folderFiles[folder]
|
||||
|
||||
m.cleanupFolderLocked(from)
|
||||
if !to.Paused {
|
||||
if fset == nil {
|
||||
if to.Paused {
|
||||
// Care needs to be taken because we already hold fmut and the lock order
|
||||
// must be the same everywhere. As fmut is acquired first, this is fine.
|
||||
m.pmut.RLock()
|
||||
for _, r := range m.indexSenders {
|
||||
r.pause(to.ID)
|
||||
}
|
||||
m.pmut.RUnlock()
|
||||
} else {
|
||||
fsetNil := fset == nil
|
||||
if fsetNil {
|
||||
// Create a new fset. Might take a while and we do it under
|
||||
// locking, but it's unsafe to create fset:s concurrently so
|
||||
// that's the price we pay.
|
||||
fset = db.NewFileSet(folder, to.Filesystem(), m.db)
|
||||
}
|
||||
m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles)
|
||||
if fsetNil || from.Paused {
|
||||
for _, devID := range to.DeviceIDs() {
|
||||
indexSenders, ok := m.indexSenders[devID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
indexSenders.resume(to, fset)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var infoMsg string
|
||||
@ -979,11 +1005,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
||||
tempIndexFolders := make([]string, 0, len(cm.Folders))
|
||||
|
||||
m.pmut.RLock()
|
||||
conn, ok := m.conn[deviceID]
|
||||
closed := m.closed[deviceID]
|
||||
for _, token := range m.indexSenderTokens[deviceID] {
|
||||
m.RemoveAndWait(token, 0)
|
||||
}
|
||||
indexSenderRegistry, ok := m.indexSenders[deviceID]
|
||||
m.pmut.RUnlock()
|
||||
if !ok {
|
||||
panic("bug: ClusterConfig called on closed or nonexistent connection")
|
||||
@ -1015,11 +1037,14 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
||||
}
|
||||
}
|
||||
|
||||
var paused []string
|
||||
indexSenderTokens := make([]suture.ServiceToken, 0, len(cm.Folders))
|
||||
paused := make(map[string]struct{}, len(cm.Folders))
|
||||
seenFolders := make(map[string]struct{}, len(cm.Folders))
|
||||
for _, folder := range cm.Folders {
|
||||
seenFolders[folder.ID] = struct{}{}
|
||||
|
||||
cfg, ok := m.cfg.Folder(folder.ID)
|
||||
if !ok || !cfg.SharedWith(deviceID) {
|
||||
indexSenderRegistry.remove(folder.ID)
|
||||
if deviceCfg.IgnoredFolder(folder.ID) {
|
||||
l.Infof("Ignoring folder %s from device %s since we are configured to", folder.Description(), deviceID)
|
||||
continue
|
||||
@ -1034,13 +1059,41 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
||||
l.Infof("Unexpected folder %s sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", folder.Description(), deviceID)
|
||||
continue
|
||||
}
|
||||
|
||||
var foundRemote, foundLocal bool
|
||||
var remoteDeviceInfo, localDeviceInfo protocol.Device
|
||||
for _, dev := range folder.Devices {
|
||||
if dev.ID == m.id {
|
||||
localDeviceInfo = dev
|
||||
foundLocal = true
|
||||
} else if dev.ID == deviceID {
|
||||
remoteDeviceInfo = dev
|
||||
foundRemote = true
|
||||
}
|
||||
if foundRemote && foundLocal {
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundRemote {
|
||||
l.Infof("Device %v sent cluster-config without the device info for the remote on folder %v", deviceID, folder.Description())
|
||||
return errMissingRemoteInClusterConfig
|
||||
}
|
||||
if !foundLocal {
|
||||
l.Infof("Device %v sent cluster-config without the device info for us locally on folder %v", deviceID, folder.Description())
|
||||
return errMissingLocalInClusterConfig
|
||||
}
|
||||
|
||||
if folder.Paused {
|
||||
paused = append(paused, folder.ID)
|
||||
indexSenderRegistry.remove(folder.ID)
|
||||
paused[cfg.ID] = struct{}{}
|
||||
continue
|
||||
}
|
||||
|
||||
if cfg.Paused {
|
||||
indexSenderRegistry.addPaused(cfg, localDeviceInfo, remoteDeviceInfo)
|
||||
continue
|
||||
}
|
||||
|
||||
m.fmut.RLock()
|
||||
fs, ok := m.folderFiles[folder.ID]
|
||||
m.fmut.RUnlock()
|
||||
@ -1054,93 +1107,21 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
||||
tempIndexFolders = append(tempIndexFolders, folder.ID)
|
||||
}
|
||||
|
||||
myIndexID := fs.IndexID(protocol.LocalDeviceID)
|
||||
mySequence := fs.Sequence(protocol.LocalDeviceID)
|
||||
var startSequence int64
|
||||
indexSenderRegistry.add(cfg, fs, localDeviceInfo, remoteDeviceInfo)
|
||||
|
||||
for _, dev := range folder.Devices {
|
||||
if dev.ID == m.id {
|
||||
// This is the other side's description of what it knows
|
||||
// about us. Lets check to see if we can start sending index
|
||||
// updates directly or need to send the index from start...
|
||||
|
||||
if dev.IndexID == myIndexID {
|
||||
// They say they've seen our index ID before, so we can
|
||||
// send a delta update only.
|
||||
|
||||
if dev.MaxSequence > mySequence {
|
||||
// Safety check. They claim to have more or newer
|
||||
// index data than we have - either we have lost
|
||||
// index data, or reset the index without resetting
|
||||
// the IndexID, or something else weird has
|
||||
// happened. We send a full index to reset the
|
||||
// situation.
|
||||
l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", deviceID, folder.Description())
|
||||
startSequence = 0
|
||||
continue
|
||||
}
|
||||
|
||||
l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", deviceID, folder.Description(), dev.MaxSequence)
|
||||
startSequence = dev.MaxSequence
|
||||
} else if dev.IndexID != 0 {
|
||||
// They say they've seen an index ID from us, but it's
|
||||
// not the right one. Either they are confused or we
|
||||
// must have reset our database since last talking to
|
||||
// them. We'll start with a full index transfer.
|
||||
l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", deviceID, folder.Description(), dev.IndexID, myIndexID)
|
||||
startSequence = 0
|
||||
}
|
||||
} else if dev.ID == deviceID {
|
||||
// This is the other side's description of themselves. We
|
||||
// check to see that it matches the IndexID we have on file,
|
||||
// otherwise we drop our old index data and expect to get a
|
||||
// completely new set.
|
||||
|
||||
theirIndexID := fs.IndexID(deviceID)
|
||||
if dev.IndexID == 0 {
|
||||
// They're not announcing an index ID. This means they
|
||||
// do not support delta indexes and we should clear any
|
||||
// information we have from them before accepting their
|
||||
// index, which will presumably be a full index.
|
||||
fs.Drop(deviceID)
|
||||
} else if dev.IndexID != theirIndexID {
|
||||
// The index ID we have on file is not what they're
|
||||
// announcing. They must have reset their database and
|
||||
// will probably send us a full index. We drop any
|
||||
// information we have and remember this new index ID
|
||||
// instead.
|
||||
l.Infof("Device %v folder %s has a new index ID (%v)", deviceID, folder.Description(), dev.IndexID)
|
||||
fs.Drop(deviceID)
|
||||
fs.SetIndexID(deviceID, dev.IndexID)
|
||||
} else {
|
||||
// They're sending a recognized index ID and will most
|
||||
// likely use delta indexes. We might already have files
|
||||
// that we need to pull so let the folder runner know
|
||||
// that it should recheck the index data.
|
||||
// We might already have files that we need to pull so let the
|
||||
// folder runner know that it should recheck the index data.
|
||||
m.fmut.RLock()
|
||||
if runner := m.folderRunners[folder.ID]; runner != nil {
|
||||
defer runner.SchedulePull()
|
||||
}
|
||||
m.fmut.RUnlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
is := &indexSender{
|
||||
conn: conn,
|
||||
connClosed: closed,
|
||||
folder: folder.ID,
|
||||
fset: fs,
|
||||
prevSequence: startSequence,
|
||||
evLogger: m.evLogger,
|
||||
}
|
||||
is.Service = util.AsService(is.serve, is.String())
|
||||
indexSenderTokens = append(indexSenderTokens, m.Add(is))
|
||||
}
|
||||
indexSenderRegistry.removeAllExcept(seenFolders)
|
||||
|
||||
m.pmut.Lock()
|
||||
m.remotePausedFolders[deviceID] = paused
|
||||
m.indexSenderTokens[deviceID] = indexSenderTokens
|
||||
m.pmut.Unlock()
|
||||
|
||||
// This breaks if we send multiple CM messages during the same connection.
|
||||
@ -1376,6 +1357,7 @@ func (m *model) Closed(conn protocol.Connection, err error) {
|
||||
delete(m.remotePausedFolders, device)
|
||||
closed := m.closed[device]
|
||||
delete(m.closed, device)
|
||||
delete(m.indexSenders, device)
|
||||
m.pmut.Unlock()
|
||||
|
||||
m.progressEmitter.temporaryIndexUnsubscribe(conn)
|
||||
@ -1779,8 +1761,10 @@ func (m *model) AddConnection(conn connections.Connection, hello protocol.Hello)
|
||||
}
|
||||
|
||||
m.conn[deviceID] = conn
|
||||
m.closed[deviceID] = make(chan struct{})
|
||||
closed := make(chan struct{})
|
||||
m.closed[deviceID] = closed
|
||||
m.deviceDownloads[deviceID] = newDeviceDownloadState()
|
||||
m.indexSenders[deviceID] = newIndexSenderRegistry(conn, closed, m.Supervisor, m.evLogger)
|
||||
// 0: default, <0: no limiting
|
||||
switch {
|
||||
case device.MaxRequestKiB > 0:
|
||||
@ -1857,168 +1841,6 @@ func (m *model) deviceWasSeen(deviceID protocol.DeviceID) {
|
||||
}
|
||||
}
|
||||
|
||||
type indexSender struct {
|
||||
suture.Service
|
||||
conn protocol.Connection
|
||||
folder string
|
||||
dev string
|
||||
fset *db.FileSet
|
||||
prevSequence int64
|
||||
evLogger events.Logger
|
||||
connClosed chan struct{}
|
||||
}
|
||||
|
||||
func (s *indexSender) serve(ctx context.Context) {
|
||||
var err error
|
||||
|
||||
l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence)
|
||||
defer l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.dev, s.conn, err)
|
||||
|
||||
// We need to send one index, regardless of whether there is something to send or not
|
||||
err = s.sendIndexTo(ctx)
|
||||
|
||||
// Subscribe to LocalIndexUpdated (we have new information to send) and
|
||||
// DeviceDisconnected (it might be us who disconnected, so we should
|
||||
// exit).
|
||||
sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
evChan := sub.C()
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for err == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.connClosed:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// While we have sent a sequence at least equal to the one
|
||||
// currently in the database, wait for the local index to update. The
|
||||
// local index may update for other folders than the one we are
|
||||
// sending for.
|
||||
if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.connClosed:
|
||||
return
|
||||
case <-evChan:
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
err = s.sendIndexTo(ctx)
|
||||
|
||||
// Wait a short amount of time before entering the next loop. If there
|
||||
// are continuous changes happening to the local index, this gives us
|
||||
// time to batch them up a little.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// Complete implements the suture.IsCompletable interface. When Serve terminates
|
||||
// before Stop is called, the supervisor will check for this method and if it
|
||||
// returns true removes the service instead of restarting it. Here it always
|
||||
// returns true, as indexSender only terminates when a connection is
|
||||
// closed/has failed, in which case retrying doesn't help.
|
||||
func (s *indexSender) Complete() bool { return true }
|
||||
|
||||
// sendIndexTo sends file infos with a sequence number higher than prevSequence and
|
||||
// returns the highest sent sequence number.
|
||||
func (s *indexSender) sendIndexTo(ctx context.Context) error {
|
||||
initial := s.prevSequence == 0
|
||||
batch := newFileInfoBatch(nil)
|
||||
batch.flushFn = func(fs []protocol.FileInfo) error {
|
||||
l.Debugf("%v: Sending %d files (<%d bytes)", s, len(batch.infos), batch.size)
|
||||
if initial {
|
||||
initial = false
|
||||
return s.conn.Index(ctx, s.folder, fs)
|
||||
}
|
||||
return s.conn.IndexUpdate(ctx, s.folder, fs)
|
||||
}
|
||||
|
||||
var err error
|
||||
var f protocol.FileInfo
|
||||
snap := s.fset.Snapshot()
|
||||
defer snap.Release()
|
||||
previousWasDelete := false
|
||||
snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool {
|
||||
// This is to make sure that renames (which is an add followed by a delete) land in the same batch.
|
||||
// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
|
||||
// the batch ends with a non-delete, or that the last item in the batch is already a delete
|
||||
if batch.full() && (!fi.IsDeleted() || previousWasDelete) {
|
||||
if err = batch.flush(); err != nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if shouldDebug() {
|
||||
if fi.SequenceNo() < s.prevSequence+1 {
|
||||
panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
|
||||
}
|
||||
}
|
||||
|
||||
if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
|
||||
l.Warnln("Non-increasing sequence detected: Checking and repairing the db...")
|
||||
// Abort this round of index sending - the next one will pick
|
||||
// up from the last successful one with the repeaired db.
|
||||
defer func() {
|
||||
if fixed, dbErr := s.fset.RepairSequence(); dbErr != nil {
|
||||
l.Warnln("Failed repairing sequence entries:", dbErr)
|
||||
panic("Failed repairing sequence entries")
|
||||
} else {
|
||||
s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
|
||||
l.Infof("Repaired %v sequence entries in database", fixed)
|
||||
}
|
||||
}()
|
||||
return false
|
||||
}
|
||||
|
||||
f = fi.(protocol.FileInfo)
|
||||
|
||||
// Mark the file as invalid if any of the local bad stuff flags are set.
|
||||
f.RawInvalid = f.IsInvalid()
|
||||
// If the file is marked LocalReceive (i.e., changed locally on a
|
||||
// receive only folder) we do not want it to ever become the
|
||||
// globally best version, invalid or not.
|
||||
if f.IsReceiveOnlyChanged() {
|
||||
f.Version = protocol.Vector{}
|
||||
}
|
||||
|
||||
// never sent externally
|
||||
f.LocalFlags = 0
|
||||
f.VersionHash = nil
|
||||
|
||||
previousWasDelete = f.IsDeleted()
|
||||
|
||||
batch.append(f)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = batch.flush()
|
||||
|
||||
// True if there was nothing to be sent
|
||||
if f.Sequence == 0 {
|
||||
return err
|
||||
}
|
||||
|
||||
s.prevSequence = f.Sequence
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *indexSender) String() string {
|
||||
return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.dev, s.conn)
|
||||
}
|
||||
|
||||
func (m *model) requestGlobal(ctx context.Context, deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
|
||||
m.pmut.RLock()
|
||||
nc, ok := m.conn[deviceID]
|
||||
@ -2373,12 +2195,12 @@ func (m *model) Availability(folder string, file protocol.FileInfo, block protoc
|
||||
var availabilities []Availability
|
||||
snap := fs.Snapshot()
|
||||
defer snap.Release()
|
||||
next:
|
||||
for _, device := range snap.Availability(file.Name) {
|
||||
for _, pausedFolder := range m.remotePausedFolders[device] {
|
||||
if pausedFolder == folder {
|
||||
continue next
|
||||
if _, ok := m.remotePausedFolders[device]; !ok {
|
||||
continue
|
||||
}
|
||||
if _, ok := m.remotePausedFolders[device][folder]; ok {
|
||||
continue
|
||||
}
|
||||
_, ok := m.conn[device]
|
||||
if ok {
|
||||
|
@ -496,23 +496,16 @@ func TestIntroducer(t *testing.T) {
|
||||
},
|
||||
},
|
||||
})
|
||||
m.ClusterConfig(device1, protocol.ClusterConfig{
|
||||
Folders: []protocol.Folder{
|
||||
{
|
||||
ID: "folder1",
|
||||
Devices: []protocol.Device{
|
||||
{
|
||||
cc := basicClusterConfig(myID, device1, "folder1")
|
||||
cc.Folders[0].Devices = append(cc.Folders[0].Devices, protocol.Device{
|
||||
ID: device2,
|
||||
Introducer: true,
|
||||
SkipIntroductionRemovals: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
m.ClusterConfig(device1, cc)
|
||||
|
||||
if newDev, ok := m.cfg.Device(device2); !ok || !newDev.Introducer || !newDev.SkipIntroductionRemovals {
|
||||
t.Error("devie 2 missing or wrong flags")
|
||||
t.Error("device 2 missing or wrong flags")
|
||||
}
|
||||
|
||||
if !contains(m.cfg.Folders()["folder1"], device2, device1) {
|
||||
@ -549,20 +542,13 @@ func TestIntroducer(t *testing.T) {
|
||||
},
|
||||
},
|
||||
})
|
||||
m.ClusterConfig(device1, protocol.ClusterConfig{
|
||||
Folders: []protocol.Folder{
|
||||
{
|
||||
ID: "folder2",
|
||||
Devices: []protocol.Device{
|
||||
{
|
||||
cc = basicClusterConfig(myID, device1, "folder2")
|
||||
cc.Folders[0].Devices = append(cc.Folders[0].Devices, protocol.Device{
|
||||
ID: device2,
|
||||
Introducer: true,
|
||||
SkipIntroductionRemovals: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
m.ClusterConfig(device1, cc)
|
||||
|
||||
// Should not get introducer, as it's already unset, and it's an existing device.
|
||||
if newDev, ok := m.cfg.Device(device2); !ok || newDev.Introducer || newDev.SkipIntroductionRemovals {
|
||||
@ -703,20 +689,13 @@ func TestIntroducer(t *testing.T) {
|
||||
},
|
||||
},
|
||||
})
|
||||
m.ClusterConfig(device1, protocol.ClusterConfig{
|
||||
Folders: []protocol.Folder{
|
||||
{
|
||||
ID: "folder2",
|
||||
Devices: []protocol.Device{
|
||||
{
|
||||
cc = basicClusterConfig(myID, device1, "folder2")
|
||||
cc.Folders[0].Devices = append(cc.Folders[0].Devices, protocol.Device{
|
||||
ID: device2,
|
||||
Introducer: true,
|
||||
SkipIntroductionRemovals: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
m.ClusterConfig(device1, cc)
|
||||
|
||||
if _, ok := m.cfg.Device(device2); !ok {
|
||||
t.Error("device 2 should not have been removed")
|
||||
|
@ -1147,3 +1147,122 @@ func TestRequestLastFileProgress(t *testing.T) {
|
||||
t.Fatal("Timed out before file was requested")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestIndexSenderPause(t *testing.T) {
|
||||
m, fc, fcfg := setupModelWithConnection()
|
||||
tfs := fcfg.Filesystem()
|
||||
defer cleanupModelAndRemoveDir(m, tfs.URI())
|
||||
|
||||
indexChan := make(chan []protocol.FileInfo)
|
||||
fc.mut.Lock()
|
||||
fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
|
||||
indexChan <- fs
|
||||
}
|
||||
fc.mut.Unlock()
|
||||
|
||||
var seq int64 = 1
|
||||
files := []protocol.FileInfo{{Name: "foo", Size: 10, Version: protocol.Vector{}.Update(myID.Short()), Sequence: seq}}
|
||||
|
||||
// Both devices connected, noone paused
|
||||
localIndexUpdate(m, fcfg.ID, files)
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
l.Infoln("timeout")
|
||||
t.Fatal("timed out before receiving index")
|
||||
case <-indexChan:
|
||||
}
|
||||
|
||||
// Remote paused
|
||||
|
||||
cc := basicClusterConfig(device1, myID, fcfg.ID)
|
||||
cc.Folders[0].Paused = true
|
||||
m.ClusterConfig(device1, cc)
|
||||
|
||||
seq++
|
||||
files[0].Sequence = seq
|
||||
files[0].Version = files[0].Version.Update(myID.Short())
|
||||
localIndexUpdate(m, fcfg.ID, files)
|
||||
|
||||
// I don't see what to hook into to ensure an index update is not sent.
|
||||
dur := 50 * time.Millisecond
|
||||
if !testing.Short() {
|
||||
dur = 2 * time.Second
|
||||
}
|
||||
select {
|
||||
case <-time.After(dur):
|
||||
case <-indexChan:
|
||||
t.Error("Received index despite remote being paused")
|
||||
}
|
||||
|
||||
// Remote unpaused
|
||||
|
||||
cc.Folders[0].Paused = false
|
||||
m.ClusterConfig(device1, cc)
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timed out before receiving index")
|
||||
case <-indexChan:
|
||||
}
|
||||
|
||||
// Local paused and resume
|
||||
|
||||
fcfg.Paused = true
|
||||
waiter, _ := m.cfg.SetFolder(fcfg)
|
||||
waiter.Wait()
|
||||
|
||||
fcfg.Paused = false
|
||||
waiter, _ = m.cfg.SetFolder(fcfg)
|
||||
waiter.Wait()
|
||||
|
||||
seq++
|
||||
files[0].Sequence = seq
|
||||
files[0].Version = files[0].Version.Update(myID.Short())
|
||||
localIndexUpdate(m, fcfg.ID, files)
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timed out before receiving index")
|
||||
case <-indexChan:
|
||||
}
|
||||
|
||||
// Local and remote paused, then first resume remote, then local
|
||||
|
||||
cc.Folders[0].Paused = true
|
||||
m.ClusterConfig(device1, cc)
|
||||
|
||||
fcfg.Paused = true
|
||||
waiter, _ = m.cfg.SetFolder(fcfg)
|
||||
waiter.Wait()
|
||||
|
||||
cc.Folders[0].Paused = false
|
||||
m.ClusterConfig(device1, cc)
|
||||
|
||||
fcfg.Paused = false
|
||||
waiter, _ = m.cfg.SetFolder(fcfg)
|
||||
waiter.Wait()
|
||||
|
||||
seq++
|
||||
files[0].Sequence = seq
|
||||
files[0].Version = files[0].Version.Update(myID.Short())
|
||||
localIndexUpdate(m, fcfg.ID, files)
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timed out before receiving index")
|
||||
case <-indexChan:
|
||||
}
|
||||
|
||||
// Folder removed on remote
|
||||
|
||||
cc = protocol.ClusterConfig{}
|
||||
m.ClusterConfig(device1, cc)
|
||||
|
||||
seq++
|
||||
files[0].Sequence = seq
|
||||
files[0].Version = files[0].Version.Update(myID.Short())
|
||||
localIndexUpdate(m, fcfg.ID, files)
|
||||
|
||||
select {
|
||||
case <-time.After(dur):
|
||||
case <-indexChan:
|
||||
t.Error("Received index despite remote not having the folder")
|
||||
}
|
||||
}
|
||||
|
@ -230,3 +230,41 @@ func folderIgnoresAlwaysReload(m *model, fcfg config.FolderConfiguration) {
|
||||
m.addAndStartFolderLockedWithIgnores(fcfg, fset, ignores)
|
||||
m.fmut.Unlock()
|
||||
}
|
||||
|
||||
func basicClusterConfig(local, remote protocol.DeviceID, folders ...string) protocol.ClusterConfig {
|
||||
var cc protocol.ClusterConfig
|
||||
for _, folder := range folders {
|
||||
cc.Folders = append(cc.Folders, protocol.Folder{
|
||||
ID: folder,
|
||||
Devices: []protocol.Device{
|
||||
{
|
||||
ID: local,
|
||||
},
|
||||
{
|
||||
ID: remote,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
return cc
|
||||
}
|
||||
|
||||
func localIndexUpdate(m *model, folder string, fs []protocol.FileInfo) {
|
||||
m.fmut.RLock()
|
||||
fset := m.folderFiles[folder]
|
||||
m.fmut.RUnlock()
|
||||
|
||||
fset.Update(protocol.LocalDeviceID, fs)
|
||||
seq := fset.Sequence(protocol.LocalDeviceID)
|
||||
filenames := make([]string, len(fs))
|
||||
for i, file := range fs {
|
||||
filenames[i] = file.Name
|
||||
}
|
||||
m.evLogger.Log(events.LocalIndexUpdated, map[string]interface{}{
|
||||
"folder": folder,
|
||||
"items": len(fs),
|
||||
"filenames": filenames,
|
||||
"sequence": seq,
|
||||
"version": seq, // legacy for sequence
|
||||
})
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user