mirror of
https://github.com/octoleo/syncthing.git
synced 2024-12-23 03:18:59 +00:00
431 lines
14 KiB
Go
431 lines
14 KiB
Go
// Copyright (C) 2020 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
package model
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/thejerf/suture/v4"
|
|
|
|
"github.com/syncthing/syncthing/lib/config"
|
|
"github.com/syncthing/syncthing/lib/db"
|
|
"github.com/syncthing/syncthing/lib/events"
|
|
"github.com/syncthing/syncthing/lib/protocol"
|
|
"github.com/syncthing/syncthing/lib/svcutil"
|
|
)
|
|
|
|
type indexSender struct {
|
|
conn protocol.Connection
|
|
folder string
|
|
folderIsReceiveEncrypted bool
|
|
fset *db.FileSet
|
|
prevSequence int64
|
|
evLogger events.Logger
|
|
connClosed chan struct{}
|
|
done chan struct{}
|
|
token suture.ServiceToken
|
|
pauseChan chan struct{}
|
|
resumeChan chan *db.FileSet
|
|
}
|
|
|
|
func (s *indexSender) Serve(ctx context.Context) (err error) {
|
|
l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence)
|
|
defer func() {
|
|
close(s.done)
|
|
err = svcutil.NoRestartErr(err)
|
|
l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err)
|
|
}()
|
|
|
|
// We need to send one index, regardless of whether there is something to send or not
|
|
err = s.sendIndexTo(ctx)
|
|
|
|
// Subscribe to LocalIndexUpdated (we have new information to send) and
|
|
// DeviceDisconnected (it might be us who disconnected, so we should
|
|
// exit).
|
|
sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
|
|
defer sub.Unsubscribe()
|
|
|
|
paused := false
|
|
evChan := sub.C()
|
|
ticker := time.NewTicker(time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for err == nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-s.connClosed:
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
// While we have sent a sequence at least equal to the one
|
|
// currently in the database, wait for the local index to update. The
|
|
// local index may update for other folders than the one we are
|
|
// sending for.
|
|
if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-s.connClosed:
|
|
return nil
|
|
case <-evChan:
|
|
case <-ticker.C:
|
|
case <-s.pauseChan:
|
|
paused = true
|
|
case s.fset = <-s.resumeChan:
|
|
paused = false
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if !paused {
|
|
err = s.sendIndexTo(ctx)
|
|
}
|
|
|
|
// Wait a short amount of time before entering the next loop. If there
|
|
// are continuous changes happening to the local index, this gives us
|
|
// time to batch them up a little.
|
|
time.Sleep(250 * time.Millisecond)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (s *indexSender) resume(fset *db.FileSet) {
|
|
select {
|
|
case <-s.done:
|
|
case s.resumeChan <- fset:
|
|
}
|
|
}
|
|
|
|
func (s *indexSender) pause() {
|
|
select {
|
|
case <-s.done:
|
|
case s.pauseChan <- struct{}{}:
|
|
}
|
|
}
|
|
|
|
// sendIndexTo sends file infos with a sequence number higher than prevSequence and
|
|
// returns the highest sent sequence number.
|
|
func (s *indexSender) sendIndexTo(ctx context.Context) error {
|
|
initial := s.prevSequence == 0
|
|
batch := db.NewFileInfoBatch(nil)
|
|
batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
|
|
l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
|
|
if initial {
|
|
initial = false
|
|
return s.conn.Index(ctx, s.folder, fs)
|
|
}
|
|
return s.conn.IndexUpdate(ctx, s.folder, fs)
|
|
})
|
|
|
|
var err error
|
|
var f protocol.FileInfo
|
|
snap, err := s.fset.Snapshot()
|
|
if err != nil {
|
|
return svcutil.AsFatalErr(err, svcutil.ExitError)
|
|
}
|
|
defer snap.Release()
|
|
previousWasDelete := false
|
|
snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool {
|
|
// This is to make sure that renames (which is an add followed by a delete) land in the same batch.
|
|
// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
|
|
// the batch ends with a non-delete, or that the last item in the batch is already a delete
|
|
if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
|
|
if err = batch.Flush(); err != nil {
|
|
return false
|
|
}
|
|
}
|
|
|
|
if shouldDebug() {
|
|
if fi.SequenceNo() < s.prevSequence+1 {
|
|
panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
|
|
}
|
|
}
|
|
|
|
if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
|
|
l.Warnln("Non-increasing sequence detected: Checking and repairing the db...")
|
|
// Abort this round of index sending - the next one will pick
|
|
// up from the last successful one with the repeaired db.
|
|
defer func() {
|
|
if fixed, dbErr := s.fset.RepairSequence(); dbErr != nil {
|
|
l.Warnln("Failed repairing sequence entries:", dbErr)
|
|
panic("Failed repairing sequence entries")
|
|
} else {
|
|
s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
|
|
l.Infof("Repaired %v sequence entries in database", fixed)
|
|
}
|
|
}()
|
|
return false
|
|
}
|
|
|
|
f = fi.(protocol.FileInfo)
|
|
|
|
// If this is a folder receiving encrypted files only, we
|
|
// mustn't ever send locally changed file infos. Those aren't
|
|
// encrypted and thus would be a protocol error at the remote.
|
|
if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() {
|
|
return true
|
|
}
|
|
|
|
f = prepareFileInfoForIndex(f)
|
|
|
|
previousWasDelete = f.IsDeleted()
|
|
|
|
batch.Append(f)
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = batch.Flush()
|
|
|
|
// True if there was nothing to be sent
|
|
if f.Sequence == 0 {
|
|
return err
|
|
}
|
|
|
|
s.prevSequence = f.Sequence
|
|
return err
|
|
}
|
|
|
|
func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo {
|
|
// Mark the file as invalid if any of the local bad stuff flags are set.
|
|
f.RawInvalid = f.IsInvalid()
|
|
// If the file is marked LocalReceive (i.e., changed locally on a
|
|
// receive only folder) we do not want it to ever become the
|
|
// globally best version, invalid or not.
|
|
if f.IsReceiveOnlyChanged() {
|
|
f.Version = protocol.Vector{}
|
|
}
|
|
// never sent externally
|
|
f.LocalFlags = 0
|
|
f.VersionHash = nil
|
|
return f
|
|
}
|
|
|
|
func (s *indexSender) String() string {
|
|
return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.conn.ID(), s.conn)
|
|
}
|
|
|
|
type indexSenderRegistry struct {
|
|
deviceID protocol.DeviceID
|
|
sup *suture.Supervisor
|
|
evLogger events.Logger
|
|
conn protocol.Connection
|
|
closed chan struct{}
|
|
indexSenders map[string]*indexSender
|
|
startInfos map[string]*indexSenderStartInfo
|
|
mut sync.Mutex
|
|
}
|
|
|
|
func newIndexSenderRegistry(conn protocol.Connection, closed chan struct{}, sup *suture.Supervisor, evLogger events.Logger) *indexSenderRegistry {
|
|
return &indexSenderRegistry{
|
|
deviceID: conn.ID(),
|
|
conn: conn,
|
|
closed: closed,
|
|
sup: sup,
|
|
evLogger: evLogger,
|
|
indexSenders: make(map[string]*indexSender),
|
|
startInfos: make(map[string]*indexSenderStartInfo),
|
|
mut: sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
// add starts an index sender for given folder.
|
|
// If an index sender is already running, it will be stopped first.
|
|
func (r *indexSenderRegistry) add(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) {
|
|
r.mut.Lock()
|
|
r.addLocked(folder, fset, startInfo)
|
|
l.Debugf("Started index sender for device %v and folder %v", r.deviceID.Short(), folder.ID)
|
|
r.mut.Unlock()
|
|
}
|
|
|
|
func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) {
|
|
myIndexID := fset.IndexID(protocol.LocalDeviceID)
|
|
mySequence := fset.Sequence(protocol.LocalDeviceID)
|
|
var startSequence int64
|
|
|
|
// This is the other side's description of what it knows
|
|
// about us. Lets check to see if we can start sending index
|
|
// updates directly or need to send the index from start...
|
|
|
|
if startInfo.local.IndexID == myIndexID {
|
|
// They say they've seen our index ID before, so we can
|
|
// send a delta update only.
|
|
|
|
if startInfo.local.MaxSequence > mySequence {
|
|
// Safety check. They claim to have more or newer
|
|
// index data than we have - either we have lost
|
|
// index data, or reset the index without resetting
|
|
// the IndexID, or something else weird has
|
|
// happened. We send a full index to reset the
|
|
// situation.
|
|
l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", r.deviceID, folder.Description())
|
|
startSequence = 0
|
|
} else {
|
|
l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", r.deviceID, folder.Description(), startInfo.local.MaxSequence)
|
|
startSequence = startInfo.local.MaxSequence
|
|
}
|
|
} else if startInfo.local.IndexID != 0 {
|
|
// They say they've seen an index ID from us, but it's
|
|
// not the right one. Either they are confused or we
|
|
// must have reset our database since last talking to
|
|
// them. We'll start with a full index transfer.
|
|
l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", r.deviceID, folder.Description(), startInfo.local.IndexID, myIndexID)
|
|
startSequence = 0
|
|
} else {
|
|
l.Debugf("Device %v folder %s has no index ID for us", r.deviceID, folder.Description())
|
|
}
|
|
|
|
// This is the other side's description of themselves. We
|
|
// check to see that it matches the IndexID we have on file,
|
|
// otherwise we drop our old index data and expect to get a
|
|
// completely new set.
|
|
|
|
theirIndexID := fset.IndexID(r.deviceID)
|
|
if startInfo.remote.IndexID == 0 {
|
|
// They're not announcing an index ID. This means they
|
|
// do not support delta indexes and we should clear any
|
|
// information we have from them before accepting their
|
|
// index, which will presumably be a full index.
|
|
l.Debugf("Device %v folder %s does not announce an index ID", r.deviceID, folder.Description())
|
|
fset.Drop(r.deviceID)
|
|
} else if startInfo.remote.IndexID != theirIndexID {
|
|
// The index ID we have on file is not what they're
|
|
// announcing. They must have reset their database and
|
|
// will probably send us a full index. We drop any
|
|
// information we have and remember this new index ID
|
|
// instead.
|
|
l.Infof("Device %v folder %s has a new index ID (%v)", r.deviceID, folder.Description(), startInfo.remote.IndexID)
|
|
fset.Drop(r.deviceID)
|
|
fset.SetIndexID(r.deviceID, startInfo.remote.IndexID)
|
|
}
|
|
|
|
if is, ok := r.indexSenders[folder.ID]; ok {
|
|
r.sup.RemoveAndWait(is.token, 0)
|
|
delete(r.indexSenders, folder.ID)
|
|
}
|
|
delete(r.startInfos, folder.ID)
|
|
|
|
is := &indexSender{
|
|
conn: r.conn,
|
|
connClosed: r.closed,
|
|
done: make(chan struct{}),
|
|
folder: folder.ID,
|
|
folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
|
|
fset: fset,
|
|
prevSequence: startSequence,
|
|
evLogger: r.evLogger,
|
|
pauseChan: make(chan struct{}),
|
|
resumeChan: make(chan *db.FileSet),
|
|
}
|
|
is.token = r.sup.Add(is)
|
|
r.indexSenders[folder.ID] = is
|
|
}
|
|
|
|
// addPending stores the given info to start an index sender once resume is called
|
|
// for this folder.
|
|
// If an index sender is already running, it will be stopped.
|
|
func (r *indexSenderRegistry) addPending(folder string, startInfo *indexSenderStartInfo) {
|
|
r.mut.Lock()
|
|
defer r.mut.Unlock()
|
|
|
|
if is, ok := r.indexSenders[folder]; ok {
|
|
r.sup.RemoveAndWait(is.token, 0)
|
|
delete(r.indexSenders, folder)
|
|
l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.deviceID.Short(), folder)
|
|
}
|
|
r.startInfos[folder] = startInfo
|
|
l.Debugf("Pending index sender for device %v and folder %v", r.deviceID.Short(), folder)
|
|
}
|
|
|
|
// remove stops a running index sender or removes one pending to be started.
|
|
// It is a noop if the folder isn't known.
|
|
func (r *indexSenderRegistry) remove(folder string) {
|
|
r.mut.Lock()
|
|
defer r.mut.Unlock()
|
|
|
|
if is, ok := r.indexSenders[folder]; ok {
|
|
r.sup.RemoveAndWait(is.token, 0)
|
|
delete(r.indexSenders, folder)
|
|
}
|
|
delete(r.startInfos, folder)
|
|
l.Debugf("Removed index sender for device %v and folder %v", r.deviceID.Short(), folder)
|
|
}
|
|
|
|
// removeAllExcept stops all running index senders and removes those pending to be started,
|
|
// except mentioned ones.
|
|
// It is a noop if the folder isn't known.
|
|
func (r *indexSenderRegistry) removeAllExcept(except map[string]struct{}) {
|
|
r.mut.Lock()
|
|
defer r.mut.Unlock()
|
|
|
|
for folder, is := range r.indexSenders {
|
|
if _, ok := except[folder]; !ok {
|
|
r.sup.RemoveAndWait(is.token, 0)
|
|
delete(r.indexSenders, folder)
|
|
l.Debugf("Removed index sender for device %v and folder %v (removeAllExcept)", r.deviceID.Short(), folder)
|
|
}
|
|
}
|
|
for folder := range r.startInfos {
|
|
if _, ok := except[folder]; !ok {
|
|
delete(r.startInfos, folder)
|
|
l.Debugf("Removed pending index sender for device %v and folder %v (removeAllExcept)", r.deviceID.Short(), folder)
|
|
}
|
|
}
|
|
}
|
|
|
|
// pause stops a running index sender.
|
|
// It is a noop if the folder isn't known or has not been started yet.
|
|
func (r *indexSenderRegistry) pause(folder string) {
|
|
r.mut.Lock()
|
|
defer r.mut.Unlock()
|
|
|
|
if is, ok := r.indexSenders[folder]; ok {
|
|
is.pause()
|
|
l.Debugf("Paused index sender for device %v and folder %v", r.deviceID.Short(), folder)
|
|
} else {
|
|
l.Debugf("No index sender for device %v and folder %v to pause", r.deviceID.Short(), folder)
|
|
}
|
|
}
|
|
|
|
// resume unpauses an already running index sender or starts it, if it was added
|
|
// while paused.
|
|
// It is a noop if the folder isn't known.
|
|
func (r *indexSenderRegistry) resume(folder config.FolderConfiguration, fset *db.FileSet) {
|
|
r.mut.Lock()
|
|
defer r.mut.Unlock()
|
|
|
|
is, isOk := r.indexSenders[folder.ID]
|
|
if info, ok := r.startInfos[folder.ID]; ok {
|
|
if isOk {
|
|
r.sup.RemoveAndWait(is.token, 0)
|
|
delete(r.indexSenders, folder.ID)
|
|
l.Debugf("Removed index sender for device %v and folder %v in resume", r.deviceID.Short(), folder.ID)
|
|
}
|
|
r.addLocked(folder, fset, info)
|
|
delete(r.startInfos, folder.ID)
|
|
l.Debugf("Started index sender for device %v and folder %v in resume", r.deviceID.Short(), folder.ID)
|
|
} else if isOk {
|
|
is.resume(fset)
|
|
l.Debugf("Resume index sender for device %v and folder %v", r.deviceID.Short(), folder.ID)
|
|
} else {
|
|
l.Debugf("Not resuming index sender for device %v and folder %v as none is paused and there is no start info", r.deviceID.Short(), folder.ID)
|
|
}
|
|
}
|
|
|
|
type indexSenderStartInfo struct {
|
|
local, remote protocol.Device
|
|
}
|