2021-06-03 12:58:50 +00:00
// Copyright (C) 2020 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package model
import (
"context"
"fmt"
"sync"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/svcutil"
2024-08-28 13:00:19 +00:00
"github.com/syncthing/syncthing/lib/ur"
2021-06-03 12:58:50 +00:00
)
type indexHandler struct {
conn protocol . Connection
downloads * deviceDownloadState
folder string
folderIsReceiveEncrypted bool
evLogger events . Logger
2024-09-29 22:18:24 +00:00
// We track the latest / highest sequence number in two ways for two
// different reasons. Initially they are the same -- the highest seen
// sequence number reported by the other side (or zero).
//
// One is the highest number we've seen when iterating the database,
// which we track for database iteration purposes. When we loop, we
// start looking at that number plus one in the next loop. Our index
// numbering may have holes which this will skip over.
//
// The other is the highest sequence we previously sent to the other
// side, used by them for correctness checks. This one must not skip
// holes. That is, if we iterate and find a hole, this is not
// incremented because nothing was sent to the other side.
localPrevSequence int64 // the highest sequence number we've seen in our FileInfos
sentPrevSequence int64 // the highest sequence number we've sent to the peer
2021-06-03 12:58:50 +00:00
cond * sync . Cond
paused bool
fset * db . FileSet
runner service
}
func newIndexHandler ( conn protocol . Connection , downloads * deviceDownloadState , folder config . FolderConfiguration , fset * db . FileSet , runner service , startInfo * clusterConfigDeviceInfo , evLogger events . Logger ) * indexHandler {
myIndexID := fset . IndexID ( protocol . LocalDeviceID )
mySequence := fset . Sequence ( protocol . LocalDeviceID )
var startSequence int64
// This is the other side's description of what it knows
// about us. Lets check to see if we can start sending index
// updates directly or need to send the index from start...
if startInfo . local . IndexID == myIndexID {
// They say they've seen our index ID before, so we can
// send a delta update only.
if startInfo . local . MaxSequence > mySequence {
// Safety check. They claim to have more or newer
// index data than we have - either we have lost
// index data, or reset the index without resetting
// the IndexID, or something else weird has
// happened. We send a full index to reset the
// situation.
2023-07-29 08:24:44 +00:00
l . Infof ( "Device %v folder %s is delta index compatible, but seems out of sync with reality" , conn . DeviceID ( ) . Short ( ) , folder . Description ( ) )
2021-06-03 12:58:50 +00:00
startSequence = 0
} else {
2023-07-29 08:24:44 +00:00
l . Debugf ( "Device %v folder %s is delta index compatible (mlv=%d)" , conn . DeviceID ( ) . Short ( ) , folder . Description ( ) , startInfo . local . MaxSequence )
2021-06-03 12:58:50 +00:00
startSequence = startInfo . local . MaxSequence
}
} else if startInfo . local . IndexID != 0 {
// They say they've seen an index ID from us, but it's
// not the right one. Either they are confused or we
// must have reset our database since last talking to
// them. We'll start with a full index transfer.
2023-07-29 08:24:44 +00:00
l . Infof ( "Device %v folder %s has mismatching index ID for us (%v != %v)" , conn . DeviceID ( ) . Short ( ) , folder . Description ( ) , startInfo . local . IndexID , myIndexID )
2021-06-03 12:58:50 +00:00
startSequence = 0
} else {
2023-07-29 08:24:44 +00:00
l . Debugf ( "Device %v folder %s has no index ID for us" , conn . DeviceID ( ) . Short ( ) , folder . Description ( ) )
2021-06-03 12:58:50 +00:00
}
// This is the other side's description of themselves. We
// check to see that it matches the IndexID we have on file,
// otherwise we drop our old index data and expect to get a
// completely new set.
2023-07-29 08:24:44 +00:00
theirIndexID := fset . IndexID ( conn . DeviceID ( ) )
2021-06-03 12:58:50 +00:00
if startInfo . remote . IndexID == 0 {
// They're not announcing an index ID. This means they
// do not support delta indexes and we should clear any
// information we have from them before accepting their
// index, which will presumably be a full index.
2023-07-29 08:24:44 +00:00
l . Debugf ( "Device %v folder %s does not announce an index ID" , conn . DeviceID ( ) . Short ( ) , folder . Description ( ) )
fset . Drop ( conn . DeviceID ( ) )
2021-06-03 12:58:50 +00:00
} else if startInfo . remote . IndexID != theirIndexID {
// The index ID we have on file is not what they're
// announcing. They must have reset their database and
// will probably send us a full index. We drop any
// information we have and remember this new index ID
// instead.
2023-07-29 08:24:44 +00:00
l . Infof ( "Device %v folder %s has a new index ID (%v)" , conn . DeviceID ( ) . Short ( ) , folder . Description ( ) , startInfo . remote . IndexID )
fset . Drop ( conn . DeviceID ( ) )
fset . SetIndexID ( conn . DeviceID ( ) , startInfo . remote . IndexID )
2021-06-03 12:58:50 +00:00
}
return & indexHandler {
conn : conn ,
downloads : downloads ,
folder : folder . ID ,
folderIsReceiveEncrypted : folder . Type == config . FolderTypeReceiveEncrypted ,
2024-09-29 22:18:24 +00:00
localPrevSequence : startSequence ,
sentPrevSequence : startSequence ,
2021-06-03 12:58:50 +00:00
evLogger : evLogger ,
fset : fset ,
runner : runner ,
cond : sync . NewCond ( new ( sync . Mutex ) ) ,
}
}
2021-07-30 12:41:00 +00:00
// waitForFileset waits for the handler to resume and fetches the current fileset.
func ( s * indexHandler ) waitForFileset ( ctx context . Context ) ( * db . FileSet , error ) {
s . cond . L . Lock ( )
defer s . cond . L . Unlock ( )
for s . paused {
select {
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
default :
s . cond . Wait ( )
}
}
return s . fset , nil
}
2021-06-03 12:58:50 +00:00
func ( s * indexHandler ) Serve ( ctx context . Context ) ( err error ) {
2024-09-29 22:18:24 +00:00
l . Debugf ( "Starting index handler for %s to %s at %s (localPrevSequence=%d)" , s . folder , s . conn . DeviceID ( ) . Short ( ) , s . conn , s . localPrevSequence )
2021-07-30 12:41:00 +00:00
stop := make ( chan struct { } )
2021-06-03 12:58:50 +00:00
defer func ( ) {
err = svcutil . NoRestartErr ( err )
2023-07-29 08:24:44 +00:00
l . Debugf ( "Exiting index handler for %s to %s at %s: %v" , s . folder , s . conn . DeviceID ( ) . Short ( ) , s . conn , err )
2021-07-30 12:41:00 +00:00
close ( stop )
} ( )
// Broadcast the pause cond when the context quits
go func ( ) {
select {
case <- ctx . Done ( ) :
s . cond . Broadcast ( )
case <- stop :
}
2021-06-03 12:58:50 +00:00
} ( )
// We need to send one index, regardless of whether there is something to send or not
2021-07-30 12:41:00 +00:00
fset , err := s . waitForFileset ( ctx )
if err != nil {
return err
2021-06-03 12:58:50 +00:00
}
err = s . sendIndexTo ( ctx , fset )
// Subscribe to LocalIndexUpdated (we have new information to send) and
// DeviceDisconnected (it might be us who disconnected, so we should
// exit).
sub := s . evLogger . Subscribe ( events . LocalIndexUpdated | events . DeviceDisconnected )
defer sub . Unsubscribe ( )
evChan := sub . C ( )
ticker := time . NewTicker ( time . Minute )
defer ticker . Stop ( )
for err == nil {
2021-07-30 12:41:00 +00:00
fset , err = s . waitForFileset ( ctx )
if err != nil {
return err
2021-06-03 12:58:50 +00:00
}
// While we have sent a sequence at least equal to the one
// currently in the database, wait for the local index to update. The
// local index may update for other folders than the one we are
// sending for.
2024-09-29 22:18:24 +00:00
if fset . Sequence ( protocol . LocalDeviceID ) <= s . localPrevSequence {
2021-06-03 12:58:50 +00:00
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
case <- evChan :
case <- ticker . C :
}
continue
}
err = s . sendIndexTo ( ctx , fset )
// Wait a short amount of time before entering the next loop. If there
// are continuous changes happening to the local index, this gives us
// time to batch them up a little.
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
case <- time . After ( 250 * time . Millisecond ) :
}
}
return err
}
2021-06-17 06:57:24 +00:00
// resume might be called because the folder was actually resumed, or just
// because the folder config changed (and thus the runner and potentially fset).
2021-06-03 12:58:50 +00:00
func ( s * indexHandler ) resume ( fset * db . FileSet , runner service ) {
s . cond . L . Lock ( )
s . paused = false
s . fset = fset
s . runner = runner
2021-07-30 12:41:00 +00:00
s . cond . Broadcast ( )
2021-06-03 12:58:50 +00:00
s . cond . L . Unlock ( )
}
func ( s * indexHandler ) pause ( ) {
s . cond . L . Lock ( )
if s . paused {
s . evLogger . Log ( events . Failure , "index handler got paused while already paused" )
}
s . paused = true
s . fset = nil
s . runner = nil
2021-07-30 12:41:00 +00:00
s . cond . Broadcast ( )
2021-06-03 12:58:50 +00:00
s . cond . L . Unlock ( )
}
// sendIndexTo sends file infos with a sequence number higher than prevSequence and
// returns the highest sent sequence number.
func ( s * indexHandler ) sendIndexTo ( ctx context . Context , fset * db . FileSet ) error {
2024-09-29 22:18:24 +00:00
initial := s . localPrevSequence == 0
2021-06-03 12:58:50 +00:00
batch := db . NewFileInfoBatch ( nil )
2024-08-28 13:00:19 +00:00
var batchError error
2021-06-03 12:58:50 +00:00
batch . SetFlushFunc ( func ( fs [ ] protocol . FileInfo ) error {
2024-09-15 09:37:49 +00:00
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
2024-08-28 13:00:19 +00:00
if len ( fs ) == 0 {
// can't happen, flush is not called with an empty batch
panic ( "bug: flush called with empty batch (race condition?)" )
}
if batchError != nil {
// can't happen, once an error is returned the index sender exits
panic ( fmt . Sprintf ( "bug: once failed it should stay failed (%v)" , batchError ) )
}
2021-06-03 12:58:50 +00:00
l . Debugf ( "%v: Sending %d files (<%d bytes)" , s , len ( fs ) , batch . Size ( ) )
2024-08-28 13:00:19 +00:00
lastSequence := fs [ len ( fs ) - 1 ] . Sequence
var err error
2021-06-03 12:58:50 +00:00
if initial {
initial = false
2024-08-28 13:00:19 +00:00
err = s . conn . Index ( ctx , & protocol . Index {
Folder : s . folder ,
Files : fs ,
LastSequence : lastSequence ,
} )
} else {
err = s . conn . IndexUpdate ( ctx , & protocol . IndexUpdate {
Folder : s . folder ,
Files : fs ,
2024-09-29 22:18:24 +00:00
PrevSequence : s . sentPrevSequence ,
2024-08-28 13:00:19 +00:00
LastSequence : lastSequence ,
} )
}
if err != nil {
batchError = err
return err
2021-06-03 12:58:50 +00:00
}
2024-09-29 22:18:24 +00:00
s . sentPrevSequence = lastSequence
2024-08-28 13:00:19 +00:00
return nil
2021-06-03 12:58:50 +00:00
} )
var err error
var f protocol . FileInfo
snap , err := fset . Snapshot ( )
if err != nil {
return svcutil . AsFatalErr ( err , svcutil . ExitError )
}
defer snap . Release ( )
previousWasDelete := false
2024-09-29 22:18:24 +00:00
snap . WithHaveSequence ( s . localPrevSequence + 1 , func ( fi protocol . FileIntf ) bool {
2021-06-03 12:58:50 +00:00
// This is to make sure that renames (which is an add followed by a delete) land in the same batch.
// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
// the batch ends with a non-delete, or that the last item in the batch is already a delete
if batch . Full ( ) && ( ! fi . IsDeleted ( ) || previousWasDelete ) {
if err = batch . Flush ( ) ; err != nil {
return false
}
}
2024-09-29 22:18:24 +00:00
if fi . SequenceNo ( ) < s . localPrevSequence + 1 {
2024-08-28 13:00:19 +00:00
s . logSequenceAnomaly ( "database returned sequence lower than requested" , map [ string ] any {
"sequence" : fi . SequenceNo ( ) ,
2024-09-29 22:18:24 +00:00
"start" : s . localPrevSequence + 1 ,
2024-08-28 13:00:19 +00:00
} )
2021-06-03 12:58:50 +00:00
}
if f . Sequence > 0 && fi . SequenceNo ( ) <= f . Sequence {
2024-08-28 13:00:19 +00:00
s . logSequenceAnomaly ( "database returned non-increasing sequence" , map [ string ] any {
"sequence" : fi . SequenceNo ( ) ,
2024-09-29 22:18:24 +00:00
"start" : s . localPrevSequence + 1 ,
2024-08-28 13:00:19 +00:00
"previous" : f . Sequence ,
} )
2021-06-03 12:58:50 +00:00
// Abort this round of index sending - the next one will pick
// up from the last successful one with the repeaired db.
defer func ( ) {
if fixed , dbErr := fset . RepairSequence ( ) ; dbErr != nil {
l . Warnln ( "Failed repairing sequence entries:" , dbErr )
panic ( "Failed repairing sequence entries" )
} else {
s . evLogger . Log ( events . Failure , "detected and repaired non-increasing sequence" )
l . Infof ( "Repaired %v sequence entries in database" , fixed )
}
} ( )
return false
}
f = fi . ( protocol . FileInfo )
// If this is a folder receiving encrypted files only, we
// mustn't ever send locally changed file infos. Those aren't
// encrypted and thus would be a protocol error at the remote.
if s . folderIsReceiveEncrypted && fi . IsReceiveOnlyChanged ( ) {
return true
}
f = prepareFileInfoForIndex ( f )
previousWasDelete = f . IsDeleted ( )
batch . Append ( f )
return true
} )
if err != nil {
return err
}
2024-03-11 06:30:21 +00:00
if err := batch . Flush ( ) ; err != nil {
return err
}
2021-06-03 12:58:50 +00:00
2024-03-10 21:28:40 +00:00
// Use the sequence of the snapshot we iterated as a starting point for the
// next run. Previously we used the sequence of the last file we sent,
// however it's possible that a higher sequence exists, just doesn't need to
// be sent (e.g. in a receive-only folder, when a local change was
// reverted). No point trying to send nothing again.
2024-09-29 22:18:24 +00:00
s . localPrevSequence = snap . Sequence ( protocol . LocalDeviceID )
2021-06-03 12:58:50 +00:00
2024-03-11 06:30:21 +00:00
return nil
2021-06-03 12:58:50 +00:00
}
2024-08-28 13:00:19 +00:00
func ( s * indexHandler ) receive ( fs [ ] protocol . FileInfo , update bool , op string , prevSequence , lastSequence int64 ) error {
2023-07-29 08:24:44 +00:00
deviceID := s . conn . DeviceID ( )
2021-06-03 12:58:50 +00:00
s . cond . L . Lock ( )
paused := s . paused
fset := s . fset
runner := s . runner
s . cond . L . Unlock ( )
if paused {
l . Infof ( "%v for paused folder %q" , op , s . folder )
return fmt . Errorf ( "%v: %w" , s . folder , ErrFolderPaused )
}
defer runner . SchedulePull ( )
s . downloads . Update ( s . folder , makeForgetUpdate ( fs ) )
if ! update {
fset . Drop ( deviceID )
}
2024-08-28 13:00:19 +00:00
l . Debugf ( "Received %d files for %s from %s, prevSeq=%d, lastSeq=%d" , len ( fs ) , s . folder , deviceID . Short ( ) , prevSequence , lastSequence )
// Verify that the previous sequence number matches what we expected
if exp := fset . Sequence ( deviceID ) ; prevSequence > 0 && prevSequence != exp {
s . logSequenceAnomaly ( "index update with unexpected sequence" , map [ string ] any {
"prevSeq" : prevSequence ,
"lastSeq" : lastSequence ,
"batch" : len ( fs ) ,
"expectedPrev" : exp ,
} )
}
2021-06-03 12:58:50 +00:00
for i := range fs {
2024-08-28 13:00:19 +00:00
// Verify index in relation to the claimed sequence boundaries
if fs [ i ] . Sequence < prevSequence {
s . logSequenceAnomaly ( "file with sequence before prevSequence" , map [ string ] any {
"prevSeq" : prevSequence ,
"lastSeq" : lastSequence ,
"batch" : len ( fs ) ,
"seenSeq" : fs [ i ] . Sequence ,
"atIndex" : i ,
} )
}
if lastSequence > 0 && fs [ i ] . Sequence > lastSequence {
s . logSequenceAnomaly ( "file with sequence after lastSequence" , map [ string ] any {
"prevSeq" : prevSequence ,
"lastSeq" : lastSequence ,
"batch" : len ( fs ) ,
"seenSeq" : fs [ i ] . Sequence ,
"atIndex" : i ,
} )
}
if i > 0 && fs [ i ] . Sequence <= fs [ i - 1 ] . Sequence {
s . logSequenceAnomaly ( "index update with non-increasing sequence" , map [ string ] any {
"prevSeq" : prevSequence ,
"lastSeq" : lastSequence ,
"batch" : len ( fs ) ,
"seenSeq" : fs [ i ] . Sequence ,
"atIndex" : i ,
"precedingSeq" : fs [ i - 1 ] . Sequence ,
} )
}
2021-06-03 12:58:50 +00:00
// The local attributes should never be transmitted over the wire.
// Make sure they look like they weren't.
fs [ i ] . LocalFlags = 0
fs [ i ] . VersionHash = nil
}
2024-08-28 13:00:19 +00:00
// Verify the claimed last sequence number
if lastSequence > 0 && len ( fs ) > 0 && lastSequence != fs [ len ( fs ) - 1 ] . Sequence {
s . logSequenceAnomaly ( "index update with unexpected last sequence" , map [ string ] any {
"prevSeq" : prevSequence ,
"lastSeq" : lastSequence ,
"batch" : len ( fs ) ,
"seenSeq" : fs [ len ( fs ) - 1 ] . Sequence ,
} )
}
2021-06-03 12:58:50 +00:00
fset . Update ( deviceID , fs )
seq := fset . Sequence ( deviceID )
2024-09-29 15:04:06 +00:00
// Check that the sequence we get back is what we put in...
2024-11-14 19:59:34 +00:00
if lastSequence > 0 && len ( fs ) > 0 && seq != lastSequence {
2024-09-29 15:04:06 +00:00
s . logSequenceAnomaly ( "unexpected sequence after update" , map [ string ] any {
"prevSeq" : prevSequence ,
"lastSeq" : lastSequence ,
"batch" : len ( fs ) ,
"seenSeq" : fs [ len ( fs ) - 1 ] . Sequence ,
"returnedSeq" : seq ,
} )
}
2021-06-03 12:58:50 +00:00
s . evLogger . Log ( events . RemoteIndexUpdated , map [ string ] interface { } {
"device" : deviceID . String ( ) ,
"folder" : s . folder ,
"items" : len ( fs ) ,
"sequence" : seq ,
"version" : seq , // legacy for sequence
} )
return nil
}
2024-08-28 13:00:19 +00:00
func ( s * indexHandler ) logSequenceAnomaly ( msg string , extra map [ string ] any ) {
extraStrs := make ( map [ string ] string , len ( extra ) )
for k , v := range extra {
extraStrs [ k ] = fmt . Sprint ( v )
}
s . evLogger . Log ( events . Failure , ur . FailureData {
Description : msg ,
Extra : extraStrs ,
} )
}
2021-06-03 12:58:50 +00:00
func prepareFileInfoForIndex ( f protocol . FileInfo ) protocol . FileInfo {
// Mark the file as invalid if any of the local bad stuff flags are set.
f . RawInvalid = f . IsInvalid ( )
// If the file is marked LocalReceive (i.e., changed locally on a
// receive only folder) we do not want it to ever become the
// globally best version, invalid or not.
if f . IsReceiveOnlyChanged ( ) {
f . Version = protocol . Vector { }
}
2023-03-10 13:14:14 +00:00
// The trailer with the encrypted fileinfo is device local, don't send info
// about that to remotes
f . Size -= int64 ( f . EncryptionTrailerSize )
f . EncryptionTrailerSize = 0
2021-06-03 12:58:50 +00:00
// never sent externally
f . LocalFlags = 0
f . VersionHash = nil
2022-09-14 07:50:55 +00:00
f . InodeChangeNs = 0
2021-06-03 12:58:50 +00:00
return f
}
func ( s * indexHandler ) String ( ) string {
2023-07-29 08:24:44 +00:00
return fmt . Sprintf ( "indexHandler@%p for %s to %s at %s" , s , s . folder , s . conn . DeviceID ( ) . Short ( ) , s . conn )
2021-06-03 12:58:50 +00:00
}
type indexHandlerRegistry struct {
evLogger events . Logger
conn protocol . Connection
downloads * deviceDownloadState
2023-08-21 16:39:13 +00:00
indexHandlers * serviceMap [ string , * indexHandler ]
2021-06-03 12:58:50 +00:00
startInfos map [ string ] * clusterConfigDeviceInfo
folderStates map [ string ] * indexHandlerFolderState
mut sync . Mutex
}
type indexHandlerFolderState struct {
cfg config . FolderConfiguration
fset * db . FileSet
runner service
}
2023-08-21 16:39:13 +00:00
func newIndexHandlerRegistry ( conn protocol . Connection , downloads * deviceDownloadState , evLogger events . Logger ) * indexHandlerRegistry {
2021-06-03 12:58:50 +00:00
r := & indexHandlerRegistry {
2023-08-21 16:39:13 +00:00
evLogger : evLogger ,
2021-06-03 12:58:50 +00:00
conn : conn ,
downloads : downloads ,
2023-08-21 16:39:13 +00:00
indexHandlers : newServiceMap [ string , * indexHandler ] ( evLogger ) ,
2021-06-03 12:58:50 +00:00
startInfos : make ( map [ string ] * clusterConfigDeviceInfo ) ,
folderStates : make ( map [ string ] * indexHandlerFolderState ) ,
mut : sync . Mutex { } ,
}
return r
}
func ( r * indexHandlerRegistry ) String ( ) string {
2023-07-29 08:24:44 +00:00
return fmt . Sprintf ( "indexHandlerRegistry/%v" , r . conn . DeviceID ( ) . Short ( ) )
2021-06-03 12:58:50 +00:00
}
2023-08-21 16:39:13 +00:00
func ( r * indexHandlerRegistry ) Serve ( ctx context . Context ) error {
// Running the index handler registry means running the individual index
// handler children.
return r . indexHandlers . Serve ( ctx )
2021-06-03 12:58:50 +00:00
}
func ( r * indexHandlerRegistry ) startLocked ( folder config . FolderConfiguration , fset * db . FileSet , runner service , startInfo * clusterConfigDeviceInfo ) {
2023-08-21 16:39:13 +00:00
r . indexHandlers . RemoveAndWait ( folder . ID , 0 )
2021-06-03 12:58:50 +00:00
delete ( r . startInfos , folder . ID )
is := newIndexHandler ( r . conn , r . downloads , folder , fset , runner , startInfo , r . evLogger )
2023-08-21 16:39:13 +00:00
r . indexHandlers . Add ( folder . ID , is )
2021-10-20 16:55:22 +00:00
// This new connection might help us get in sync.
runner . SchedulePull ( )
2021-06-03 12:58:50 +00:00
}
// AddIndexInfo starts an index handler for given folder, unless it is paused.
// If it is paused, the given startInfo is stored to start the sender once the
// folder is resumed.
// If an index handler is already running, it will be stopped first.
func ( r * indexHandlerRegistry ) AddIndexInfo ( folder string , startInfo * clusterConfigDeviceInfo ) {
r . mut . Lock ( )
defer r . mut . Unlock ( )
2023-09-02 14:42:46 +00:00
if r . indexHandlers . RemoveAndWait ( folder , 0 ) == nil {
2023-07-29 08:24:44 +00:00
l . Debugf ( "Removed index sender for device %v and folder %v due to added pending" , r . conn . DeviceID ( ) . Short ( ) , folder )
2021-06-03 12:58:50 +00:00
}
folderState , ok := r . folderStates [ folder ]
if ! ok {
2023-07-29 08:24:44 +00:00
l . Debugf ( "Pending index handler for device %v and folder %v" , r . conn . DeviceID ( ) . Short ( ) , folder )
2021-06-03 12:58:50 +00:00
r . startInfos [ folder ] = startInfo
return
}
r . startLocked ( folderState . cfg , folderState . fset , folderState . runner , startInfo )
}
// Remove stops a running index handler or removes one pending to be started.
// It is a noop if the folder isn't known.
func ( r * indexHandlerRegistry ) Remove ( folder string ) {
r . mut . Lock ( )
defer r . mut . Unlock ( )
2023-07-29 08:24:44 +00:00
l . Debugf ( "Removing index handler for device %v and folder %v" , r . conn . DeviceID ( ) . Short ( ) , folder )
2023-08-21 16:39:13 +00:00
r . indexHandlers . RemoveAndWait ( folder , 0 )
2021-06-03 12:58:50 +00:00
delete ( r . startInfos , folder )
2023-07-29 08:24:44 +00:00
l . Debugf ( "Removed index handler for device %v and folder %v" , r . conn . DeviceID ( ) . Short ( ) , folder )
2021-06-03 12:58:50 +00:00
}
// RemoveAllExcept stops all running index handlers and removes those pending to be started,
// except mentioned ones.
// It is a noop if the folder isn't known.
2022-04-10 20:47:57 +00:00
func ( r * indexHandlerRegistry ) RemoveAllExcept ( except map [ string ] remoteFolderState ) {
2021-06-03 12:58:50 +00:00
r . mut . Lock ( )
defer r . mut . Unlock ( )
2023-09-02 14:42:46 +00:00
r . indexHandlers . Each ( func ( folder string , is * indexHandler ) error {
2021-06-03 12:58:50 +00:00
if _ , ok := except [ folder ] ; ! ok {
2023-08-21 16:39:13 +00:00
r . indexHandlers . RemoveAndWait ( folder , 0 )
2023-07-29 08:24:44 +00:00
l . Debugf ( "Removed index handler for device %v and folder %v (removeAllExcept)" , r . conn . DeviceID ( ) . Short ( ) , folder )
2021-06-03 12:58:50 +00:00
}
2023-09-02 14:42:46 +00:00
return nil
2023-08-21 16:39:13 +00:00
} )
2021-06-03 12:58:50 +00:00
for folder := range r . startInfos {
if _ , ok := except [ folder ] ; ! ok {
delete ( r . startInfos , folder )
2023-07-29 08:24:44 +00:00
l . Debugf ( "Removed pending index handler for device %v and folder %v (removeAllExcept)" , r . conn . DeviceID ( ) . Short ( ) , folder )
2021-06-03 12:58:50 +00:00
}
}
}
// RegisterFolderState must be called whenever something about the folder
// changes. The exception being if the folder is removed entirely, then call
// Remove. The fset and runner arguments may be nil, if given folder is paused.
func ( r * indexHandlerRegistry ) RegisterFolderState ( folder config . FolderConfiguration , fset * db . FileSet , runner service ) {
2023-07-29 08:24:44 +00:00
if ! folder . SharedWith ( r . conn . DeviceID ( ) ) {
2021-06-03 12:58:50 +00:00
r . Remove ( folder . ID )
return
}
r . mut . Lock ( )
if folder . Paused {
r . folderPausedLocked ( folder . ID )
} else {
2021-06-17 06:57:24 +00:00
r . folderRunningLocked ( folder , fset , runner )
2021-06-03 12:58:50 +00:00
}
r . mut . Unlock ( )
}
// folderPausedLocked stops a running index handler.
// It is a noop if the folder isn't known or has not been started yet.
func ( r * indexHandlerRegistry ) folderPausedLocked ( folder string ) {
2023-07-29 08:24:44 +00:00
l . Debugf ( "Pausing index handler for device %v and folder %v" , r . conn . DeviceID ( ) . Short ( ) , folder )
2021-06-03 12:58:50 +00:00
delete ( r . folderStates , folder )
2023-08-21 16:39:13 +00:00
if is , ok := r . indexHandlers . Get ( folder ) ; ok {
2021-06-03 12:58:50 +00:00
is . pause ( )
2023-07-29 08:24:44 +00:00
l . Debugf ( "Paused index handler for device %v and folder %v" , r . conn . DeviceID ( ) . Short ( ) , folder )
2021-06-03 12:58:50 +00:00
} else {
2023-07-29 08:24:44 +00:00
l . Debugf ( "No index handler for device %v and folder %v to pause" , r . conn . DeviceID ( ) . Short ( ) , folder )
2021-06-03 12:58:50 +00:00
}
}
2021-06-17 06:57:24 +00:00
// folderRunningLocked resumes an already running index handler or starts it, if it
2021-06-03 12:58:50 +00:00
// was added while paused.
// It is a noop if the folder isn't known.
2021-06-17 06:57:24 +00:00
func ( r * indexHandlerRegistry ) folderRunningLocked ( folder config . FolderConfiguration , fset * db . FileSet , runner service ) {
2021-06-03 12:58:50 +00:00
r . folderStates [ folder . ID ] = & indexHandlerFolderState {
cfg : folder ,
fset : fset ,
runner : runner ,
}
2023-08-21 16:39:13 +00:00
is , isOk := r . indexHandlers . Get ( folder . ID )
2021-06-03 12:58:50 +00:00
if info , ok := r . startInfos [ folder . ID ] ; ok {
if isOk {
2023-08-21 16:39:13 +00:00
r . indexHandlers . RemoveAndWait ( folder . ID , 0 )
2023-07-29 08:24:44 +00:00
l . Debugf ( "Removed index handler for device %v and folder %v in resume" , r . conn . DeviceID ( ) . Short ( ) , folder . ID )
2021-06-03 12:58:50 +00:00
}
r . startLocked ( folder , fset , runner , info )
delete ( r . startInfos , folder . ID )
2023-07-29 08:24:44 +00:00
l . Debugf ( "Started index handler for device %v and folder %v in resume" , r . conn . DeviceID ( ) . Short ( ) , folder . ID )
2021-06-03 12:58:50 +00:00
} else if isOk {
2023-07-29 08:24:44 +00:00
l . Debugf ( "Resuming index handler for device %v and folder %v" , r . conn . DeviceID ( ) . Short ( ) , folder )
2021-06-03 12:58:50 +00:00
is . resume ( fset , runner )
} else {
2023-07-29 08:24:44 +00:00
l . Debugf ( "Not resuming index handler for device %v and folder %v as none is paused and there is no start info" , r . conn . DeviceID ( ) . Short ( ) , folder . ID )
2021-06-03 12:58:50 +00:00
}
}
2024-08-28 13:00:19 +00:00
func ( r * indexHandlerRegistry ) ReceiveIndex ( folder string , fs [ ] protocol . FileInfo , update bool , op string , prevSequence , lastSequence int64 ) error {
2021-06-03 12:58:50 +00:00
r . mut . Lock ( )
defer r . mut . Unlock ( )
2023-08-21 16:39:13 +00:00
is , isOk := r . indexHandlers . Get ( folder )
2021-06-03 12:58:50 +00:00
if ! isOk {
l . Infof ( "%v for nonexistent or paused folder %q" , op , folder )
2023-07-29 08:24:44 +00:00
return fmt . Errorf ( "%s: %w" , folder , ErrFolderMissing )
2021-06-03 12:58:50 +00:00
}
2024-08-28 13:00:19 +00:00
return is . receive ( fs , update , op , prevSequence , lastSequence )
2021-06-03 12:58:50 +00:00
}
// makeForgetUpdate takes an index update and constructs a download progress update
// causing to forget any progress for files which we've just been sent.
func makeForgetUpdate ( files [ ] protocol . FileInfo ) [ ] protocol . FileDownloadProgressUpdate {
updates := make ( [ ] protocol . FileDownloadProgressUpdate , 0 , len ( files ) )
for _ , file := range files {
if file . IsSymlink ( ) || file . IsDirectory ( ) || file . IsDeleted ( ) {
continue
}
updates = append ( updates , protocol . FileDownloadProgressUpdate {
Name : file . Name ,
Version : file . Version ,
UpdateType : protocol . FileDownloadProgressUpdateTypeForget ,
} )
}
return updates
}