mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-22 22:58:25 +00:00
parent
cc9ea9db89
commit
d4ce0dfd84
@ -25,7 +25,6 @@ type indexSender struct {
|
||||
suture.Service
|
||||
conn protocol.Connection
|
||||
folder string
|
||||
dev string
|
||||
fset *db.FileSet
|
||||
prevSequence int64
|
||||
evLogger events.Logger
|
||||
@ -38,8 +37,8 @@ type indexSender struct {
|
||||
func (s *indexSender) serve(ctx context.Context) {
|
||||
var err error
|
||||
|
||||
l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence)
|
||||
defer l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.dev, s.conn, err)
|
||||
l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence)
|
||||
defer l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err)
|
||||
|
||||
// We need to send one index, regardless of whether there is something to send or not
|
||||
err = s.sendIndexTo(ctx)
|
||||
@ -204,7 +203,7 @@ func (s *indexSender) sendIndexTo(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (s *indexSender) String() string {
|
||||
return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.dev, s.conn)
|
||||
return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.conn.ID(), s.conn)
|
||||
}
|
||||
|
||||
type indexSenderRegistry struct {
|
||||
@ -239,15 +238,13 @@ func (r *indexSenderRegistry) add(folder config.FolderConfiguration, fset *db.Fi
|
||||
r.mut.Unlock()
|
||||
}
|
||||
|
||||
func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) {
|
||||
if is, ok := r.indexSenders[folder.ID]; ok {
|
||||
r.sup.RemoveAndWait(is.token, 0)
|
||||
delete(r.indexSenders, folder.ID)
|
||||
}
|
||||
if _, ok := r.startInfos[folder.ID]; ok {
|
||||
delete(r.startInfos, folder.ID)
|
||||
}
|
||||
func (r *indexSenderRegistry) addNew(folder config.FolderConfiguration, fset *db.FileSet) {
|
||||
r.mut.Lock()
|
||||
r.startLocked(folder.ID, fset, 0)
|
||||
r.mut.Unlock()
|
||||
}
|
||||
|
||||
func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) {
|
||||
myIndexID := fset.IndexID(protocol.LocalDeviceID)
|
||||
mySequence := fset.Sequence(protocol.LocalDeviceID)
|
||||
var startSequence int64
|
||||
@ -305,10 +302,22 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset
|
||||
fset.SetIndexID(r.deviceID, startInfo.remote.IndexID)
|
||||
}
|
||||
|
||||
r.startLocked(folder.ID, fset, startSequence)
|
||||
}
|
||||
|
||||
func (r *indexSenderRegistry) startLocked(folderID string, fset *db.FileSet, startSequence int64) {
|
||||
if is, ok := r.indexSenders[folderID]; ok {
|
||||
r.sup.RemoveAndWait(is.token, 0)
|
||||
delete(r.indexSenders, folderID)
|
||||
}
|
||||
if _, ok := r.startInfos[folderID]; ok {
|
||||
delete(r.startInfos, folderID)
|
||||
}
|
||||
|
||||
is := &indexSender{
|
||||
conn: r.conn,
|
||||
connClosed: r.closed,
|
||||
folder: folder.ID,
|
||||
folder: folderID,
|
||||
fset: fset,
|
||||
prevSequence: startSequence,
|
||||
evLogger: r.evLogger,
|
||||
@ -317,7 +326,7 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset
|
||||
}
|
||||
is.Service = util.AsService(is.serve, is.String())
|
||||
is.token = r.sup.Add(is)
|
||||
r.indexSenders[folder.ID] = is
|
||||
r.indexSenders[folderID] = is
|
||||
}
|
||||
|
||||
// addPaused stores the given info to start an index sender once resume is called
|
||||
|
@ -478,18 +478,10 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
|
||||
|
||||
// Cache the (maybe) existing fset before it's removed by cleanupFolderLocked
|
||||
fset := m.folderFiles[folder]
|
||||
fsetNil := fset == nil
|
||||
|
||||
m.cleanupFolderLocked(from)
|
||||
if to.Paused {
|
||||
// Care needs to be taken because we already hold fmut and the lock order
|
||||
// must be the same everywhere. As fmut is acquired first, this is fine.
|
||||
m.pmut.RLock()
|
||||
for _, r := range m.indexSenders {
|
||||
r.pause(to.ID)
|
||||
}
|
||||
m.pmut.RUnlock()
|
||||
} else {
|
||||
fsetNil := fset == nil
|
||||
if !to.Paused {
|
||||
if fsetNil {
|
||||
// Create a new fset. Might take a while and we do it under
|
||||
// locking, but it's unsafe to create fset:s concurrently so
|
||||
@ -497,16 +489,31 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
|
||||
fset = db.NewFileSet(folder, to.Filesystem(), m.db)
|
||||
}
|
||||
m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles)
|
||||
if fsetNil || from.Paused {
|
||||
for _, devID := range to.DeviceIDs() {
|
||||
indexSenders, ok := m.indexSenders[devID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
indexSenders.resume(to, fset)
|
||||
}
|
||||
}
|
||||
|
||||
// Care needs to be taken because we already hold fmut and the lock order
|
||||
// must be the same everywhere. As fmut is acquired first, this is fine.
|
||||
// toDeviceIDs := to.DeviceIDs()
|
||||
m.pmut.RLock()
|
||||
for _, id := range to.DeviceIDs() {
|
||||
indexSenders, ok := m.indexSenders[id]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// In case the folder was newly shared with us we already got a
|
||||
// cluster config and wont necessarily get another soon - start
|
||||
// sending indexes if connected.
|
||||
isNew := !from.SharedWith(indexSenders.deviceID)
|
||||
if isNew {
|
||||
indexSenders.addNew(to, fset)
|
||||
}
|
||||
if to.Paused {
|
||||
indexSenders.pause(to.ID)
|
||||
} else if !isNew && (fsetNil || from.Paused) {
|
||||
indexSenders.resume(to, fset)
|
||||
}
|
||||
}
|
||||
m.pmut.RUnlock()
|
||||
|
||||
var infoMsg string
|
||||
switch {
|
||||
@ -527,7 +534,24 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
|
||||
|
||||
m.fmut.Lock()
|
||||
defer m.fmut.Unlock()
|
||||
|
||||
// In case this folder is new and was shared with us we already got a
|
||||
// cluster config and wont necessarily get another soon - start sending
|
||||
// indexes if connected.
|
||||
if fset.Sequence(protocol.LocalDeviceID) == 0 {
|
||||
m.pmut.RLock()
|
||||
for _, id := range cfg.DeviceIDs() {
|
||||
if is, ok := m.indexSenders[id]; ok {
|
||||
if fset.Sequence(id) == 0 {
|
||||
is.addNew(cfg, fset)
|
||||
}
|
||||
}
|
||||
}
|
||||
m.pmut.RUnlock()
|
||||
}
|
||||
|
||||
m.addAndStartFolderLocked(cfg, fset, cacheIgnoredFiles)
|
||||
|
||||
}
|
||||
|
||||
func (m *model) UsageReportingStats(report *contract.Report, version int, preview bool) {
|
||||
@ -1060,25 +1084,22 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
||||
continue
|
||||
}
|
||||
|
||||
var foundRemote, foundLocal bool
|
||||
var remoteDeviceInfo, localDeviceInfo protocol.Device
|
||||
deviceInfos := &indexSenderStartInfo{}
|
||||
for _, dev := range folder.Devices {
|
||||
if dev.ID == m.id {
|
||||
localDeviceInfo = dev
|
||||
foundLocal = true
|
||||
deviceInfos.local = dev
|
||||
} else if dev.ID == deviceID {
|
||||
remoteDeviceInfo = dev
|
||||
foundRemote = true
|
||||
deviceInfos.remote = dev
|
||||
}
|
||||
if foundRemote && foundLocal {
|
||||
if deviceInfos.local.ID != protocol.EmptyDeviceID && deviceInfos.remote.ID != protocol.EmptyDeviceID {
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundRemote {
|
||||
if deviceInfos.remote.ID == protocol.EmptyDeviceID {
|
||||
l.Infof("Device %v sent cluster-config without the device info for the remote on folder %v", deviceID, folder.Description())
|
||||
return errMissingRemoteInClusterConfig
|
||||
}
|
||||
if !foundLocal {
|
||||
if deviceInfos.local.ID == protocol.EmptyDeviceID {
|
||||
l.Infof("Device %v sent cluster-config without the device info for us locally on folder %v", deviceID, folder.Description())
|
||||
return errMissingLocalInClusterConfig
|
||||
}
|
||||
@ -1090,10 +1111,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
||||
}
|
||||
|
||||
if cfg.Paused {
|
||||
indexSenderRegistry.addPaused(cfg, &indexSenderStartInfo{
|
||||
local: localDeviceInfo,
|
||||
remote: remoteDeviceInfo,
|
||||
})
|
||||
indexSenderRegistry.addPaused(cfg, deviceInfos)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -1110,10 +1128,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
|
||||
tempIndexFolders = append(tempIndexFolders, folder.ID)
|
||||
}
|
||||
|
||||
indexSenderRegistry.add(cfg, fs, &indexSenderStartInfo{
|
||||
local: localDeviceInfo,
|
||||
remote: remoteDeviceInfo,
|
||||
})
|
||||
indexSenderRegistry.add(cfg, fs, deviceInfos)
|
||||
|
||||
// We might already have files that we need to pull so let the
|
||||
// folder runner know that it should recheck the index data.
|
||||
|
Loading…
x
Reference in New Issue
Block a user