mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-22 22:58:25 +00:00
lib/folder: Clear pull errors when nothing is needed anymore (#7093)
This commit is contained in:
parent
a08a1b6998
commit
cc9ea9db89
@ -55,8 +55,6 @@ type folder struct {
|
||||
scanTimer *time.Timer
|
||||
scanDelay chan time.Duration
|
||||
initialScanFinished chan struct{}
|
||||
scanErrors []FileError
|
||||
scanErrorsMut sync.Mutex
|
||||
versionCleanupInterval time.Duration
|
||||
versionCleanupTimer *time.Timer
|
||||
|
||||
@ -64,6 +62,10 @@ type folder struct {
|
||||
pullPause time.Duration
|
||||
pullFailTimer *time.Timer
|
||||
|
||||
scanErrors []FileError
|
||||
pullErrors []FileError
|
||||
errorsMut sync.Mutex
|
||||
|
||||
doInSyncChan chan syncRequest
|
||||
|
||||
forcedRescanRequested chan struct{}
|
||||
@ -106,12 +108,13 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
|
||||
scanTimer: time.NewTimer(0), // The first scan should be done immediately.
|
||||
scanDelay: make(chan time.Duration),
|
||||
initialScanFinished: make(chan struct{}),
|
||||
scanErrorsMut: sync.NewMutex(),
|
||||
versionCleanupInterval: time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second,
|
||||
versionCleanupTimer: time.NewTimer(time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second),
|
||||
|
||||
pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
|
||||
|
||||
errorsMut: sync.NewMutex(),
|
||||
|
||||
doInSyncChan: make(chan syncRequest),
|
||||
|
||||
forcedRescanRequested: make(chan struct{}, 1),
|
||||
@ -333,6 +336,10 @@ func (f *folder) pull() (success bool) {
|
||||
})
|
||||
snap.Release()
|
||||
if abort {
|
||||
// Clears pull failures on items that were needed before, but aren't anymore.
|
||||
f.errorsMut.Lock()
|
||||
f.pullErrors = nil
|
||||
f.errorsMut.Unlock()
|
||||
return true
|
||||
}
|
||||
|
||||
@ -967,18 +974,18 @@ func (f *folder) String() string {
|
||||
}
|
||||
|
||||
func (f *folder) newScanError(path string, err error) {
|
||||
f.scanErrorsMut.Lock()
|
||||
f.errorsMut.Lock()
|
||||
l.Infof("Scanner (folder %s, item %q): %v", f.Description(), path, err)
|
||||
f.scanErrors = append(f.scanErrors, FileError{
|
||||
Err: err.Error(),
|
||||
Path: path,
|
||||
})
|
||||
f.scanErrorsMut.Unlock()
|
||||
f.errorsMut.Unlock()
|
||||
}
|
||||
|
||||
func (f *folder) clearScanErrors(subDirs []string) {
|
||||
f.scanErrorsMut.Lock()
|
||||
defer f.scanErrorsMut.Unlock()
|
||||
f.errorsMut.Lock()
|
||||
defer f.errorsMut.Unlock()
|
||||
if len(subDirs) == 0 {
|
||||
f.scanErrors = nil
|
||||
return
|
||||
@ -997,9 +1004,14 @@ outer:
|
||||
}
|
||||
|
||||
func (f *folder) Errors() []FileError {
|
||||
f.scanErrorsMut.Lock()
|
||||
defer f.scanErrorsMut.Unlock()
|
||||
return append([]FileError{}, f.scanErrors...)
|
||||
f.errorsMut.Lock()
|
||||
defer f.errorsMut.Unlock()
|
||||
scanLen := len(f.scanErrors)
|
||||
errors := make([]FileError, scanLen+len(f.pullErrors))
|
||||
copy(errors[:scanLen], f.scanErrors)
|
||||
copy(errors[scanLen:], f.pullErrors)
|
||||
sort.Sort(fileErrorList(errors))
|
||||
return errors
|
||||
}
|
||||
|
||||
// ScheduleForceRescan marks the file such that it gets rehashed on next scan, and schedules a scan.
|
||||
|
@ -128,9 +128,7 @@ type sendReceiveFolder struct {
|
||||
blockPullReorderer blockPullReorderer
|
||||
writeLimiter *byteSemaphore
|
||||
|
||||
pullErrors map[string]string // actual exposed pull errors
|
||||
tempPullErrors map[string]string // pull errors that might be just transient
|
||||
pullErrorsMut sync.Mutex
|
||||
}
|
||||
|
||||
func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
|
||||
@ -140,7 +138,6 @@ func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matche
|
||||
queue: newJobQueue(),
|
||||
blockPullReorderer: newBlockPullReorderer(cfg.BlockPullOrder, model.id, cfg.DeviceIDs()),
|
||||
writeLimiter: newByteSemaphore(cfg.MaxConcurrentWrites),
|
||||
pullErrorsMut: sync.NewMutex(),
|
||||
}
|
||||
f.folder.puller = f
|
||||
f.folder.Service = util.AsService(f.serve, f.String())
|
||||
@ -177,9 +174,9 @@ func (f *sendReceiveFolder) pull() bool {
|
||||
|
||||
changed := 0
|
||||
|
||||
f.pullErrorsMut.Lock()
|
||||
f.errorsMut.Lock()
|
||||
f.pullErrors = nil
|
||||
f.pullErrorsMut.Unlock()
|
||||
f.errorsMut.Unlock()
|
||||
|
||||
for tries := 0; tries < maxPullerIterations; tries++ {
|
||||
select {
|
||||
@ -204,14 +201,20 @@ func (f *sendReceiveFolder) pull() bool {
|
||||
}
|
||||
}
|
||||
|
||||
f.pullErrorsMut.Lock()
|
||||
f.pullErrors = f.tempPullErrors
|
||||
f.tempPullErrors = nil
|
||||
for path, err := range f.pullErrors {
|
||||
l.Infof("Puller (folder %s, item %q): %v", f.Description(), path, err)
|
||||
f.errorsMut.Lock()
|
||||
pullErrNum := len(f.tempPullErrors)
|
||||
if pullErrNum > 0 {
|
||||
f.pullErrors = make([]FileError, 0, len(f.tempPullErrors))
|
||||
for path, err := range f.tempPullErrors {
|
||||
l.Infof("Puller (folder %s, item %q): %v", f.Description(), path, err)
|
||||
f.pullErrors = append(f.pullErrors, FileError{
|
||||
Err: err,
|
||||
Path: path,
|
||||
})
|
||||
}
|
||||
f.tempPullErrors = nil
|
||||
}
|
||||
pullErrNum := len(f.pullErrors)
|
||||
f.pullErrorsMut.Unlock()
|
||||
f.errorsMut.Unlock()
|
||||
|
||||
if pullErrNum > 0 {
|
||||
l.Infof("%v: Failed to sync %v items", f.Description(), pullErrNum)
|
||||
@ -229,9 +232,9 @@ func (f *sendReceiveFolder) pull() bool {
|
||||
// might have failed). One puller iteration handles all files currently
|
||||
// flagged as needed in the folder.
|
||||
func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int {
|
||||
f.pullErrorsMut.Lock()
|
||||
f.errorsMut.Lock()
|
||||
f.tempPullErrors = make(map[string]string)
|
||||
f.pullErrorsMut.Unlock()
|
||||
f.errorsMut.Unlock()
|
||||
|
||||
snap := f.fset.Snapshot()
|
||||
defer snap.Release()
|
||||
@ -1788,8 +1791,8 @@ func (f *sendReceiveFolder) newPullError(path string, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
f.pullErrorsMut.Lock()
|
||||
defer f.pullErrorsMut.Unlock()
|
||||
f.errorsMut.Lock()
|
||||
defer f.errorsMut.Unlock()
|
||||
|
||||
// We might get more than one error report for a file (i.e. error on
|
||||
// Write() followed by Close()); we keep the first error as that is
|
||||
@ -1807,19 +1810,6 @@ func (f *sendReceiveFolder) newPullError(path string, err error) {
|
||||
l.Debugf("%v new error for %v: %v", f, path, err)
|
||||
}
|
||||
|
||||
func (f *sendReceiveFolder) Errors() []FileError {
|
||||
scanErrors := f.folder.Errors()
|
||||
f.pullErrorsMut.Lock()
|
||||
errors := make([]FileError, 0, len(f.pullErrors)+len(f.scanErrors))
|
||||
for path, err := range f.pullErrors {
|
||||
errors = append(errors, FileError{path, err})
|
||||
}
|
||||
f.pullErrorsMut.Unlock()
|
||||
errors = append(errors, scanErrors...)
|
||||
sort.Sort(fileErrorList(errors))
|
||||
return errors
|
||||
}
|
||||
|
||||
// deleteItemOnDisk deletes the file represented by old that is about to be replaced by new.
|
||||
func (f *sendReceiveFolder) deleteItemOnDisk(item protocol.FileInfo, snap *db.Snapshot, scanChan chan<- string) (err error) {
|
||||
defer func() {
|
||||
|
@ -96,7 +96,6 @@ func setupSendReceiveFolder(files ...protocol.FileInfo) (*model, *sendReceiveFol
|
||||
model := setupModel(w)
|
||||
model.Supervisor.Stop()
|
||||
f := model.folderRunners[fcfg.ID].(*sendReceiveFolder)
|
||||
f.pullErrors = make(map[string]string)
|
||||
f.tempPullErrors = make(map[string]string)
|
||||
f.ctx = context.Background()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user