mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-08 22:31:04 +00:00
This change adds a separate config for the cleanup interval, and runs that cleanup from the main folder loop.
This commit is contained in:
parent
245b4b98c4
commit
aedc2d788f
@ -314,6 +314,8 @@
|
||||
<span ng-switch-when="unknown"><span class="hidden-xs" translate>Unknown</span><span class="visible-xs" aria-label="{{'Unknown' | translate}}"><i class="fas fa-fw fa-question-circle"></i></span></span>
|
||||
<span ng-switch-when="unshared"><span class="hidden-xs" translate>Unshared</span><span class="visible-xs" aria-label="{{'Unshared' | translate}}"><i class="fas fa-fw fa-unlink"></i></span></span>
|
||||
<span ng-switch-when="scan-waiting"><span class="hidden-xs" translate>Waiting to Scan</span><span class="visible-xs" aria-label="{{'Waiting to Scan' | translate}}"><i class="fas fa-fw fa-hourglass-half"></i></span></span>
|
||||
<span ng-switch-when="cleaning"><span class="hidden-xs" translate>Cleaning Versions</span><span class="visible-xs" aria-label="{{'Cleaning Versions' | translate}}"><i class="fas fa-fw fa-recycle"></i></span></span>
|
||||
<span ng-switch-when="clean-waiting"><span class="hidden-xs" translate>Waiting to Clean</span><span class="visible-xs" aria-label="{{'Waiting to Clean' | translate}}"><i class="fas fa-fw fa-hourglass-half"></i></span></span>
|
||||
<span ng-switch-when="stopped"><span class="hidden-xs" translate>Stopped</span><span class="visible-xs" aria-label="{{'Stopped' | translate}}"><i class="fas fa-fw fa-stop"></i></span></span>
|
||||
<span ng-switch-when="scanning">
|
||||
<span class="hidden-xs" translate>Scanning</span>
|
||||
|
@ -824,7 +824,7 @@ angular.module('syncthing.core')
|
||||
if (status == 'paused') {
|
||||
return 'default';
|
||||
}
|
||||
if (status === 'syncing' || status === 'sync-preparing' || status === 'scanning') {
|
||||
if (status === 'syncing' || status === 'sync-preparing' || status === 'scanning' || status === 'cleaning') {
|
||||
return 'primary';
|
||||
}
|
||||
if (status === 'unknown') {
|
||||
@ -833,7 +833,7 @@ angular.module('syncthing.core')
|
||||
if (status === 'stopped' || status === 'outofsync' || status === 'error' || status === 'faileditems') {
|
||||
return 'danger';
|
||||
}
|
||||
if (status === 'unshared' || status === 'scan-waiting' || status === 'sync-waiting') {
|
||||
if (status === 'unshared' || status === 'scan-waiting' || status === 'sync-waiting' || status === 'clean-waiting') {
|
||||
return 'warning';
|
||||
}
|
||||
|
||||
|
@ -226,6 +226,11 @@ func (f *FolderConfiguration) prepare() {
|
||||
if f.Versioning.Params == nil {
|
||||
f.Versioning.Params = make(map[string]string)
|
||||
}
|
||||
if f.Versioning.CleanupIntervalS > MaxRescanIntervalS {
|
||||
f.Versioning.CleanupIntervalS = MaxRescanIntervalS
|
||||
} else if f.Versioning.CleanupIntervalS < 0 {
|
||||
f.Versioning.CleanupIntervalS = 0
|
||||
}
|
||||
|
||||
if f.WeakHashThresholdPct == 0 {
|
||||
f.WeakHashThresholdPct = 25
|
||||
|
@ -7,21 +7,28 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"encoding/xml"
|
||||
"sort"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/util"
|
||||
)
|
||||
|
||||
// VersioningConfiguration is used in the code and for JSON serialization
|
||||
type VersioningConfiguration struct {
|
||||
Type string `xml:"type,attr" json:"type"`
|
||||
Params map[string]string `json:"params"`
|
||||
Type string `json:"type"`
|
||||
Params map[string]string `json:"params"`
|
||||
CleanupIntervalS int `json:"cleanupIntervalS" default:"3600"`
|
||||
}
|
||||
|
||||
type InternalVersioningConfiguration struct {
|
||||
Type string `xml:"type,attr,omitempty"`
|
||||
Params []InternalParam `xml:"param"`
|
||||
// internalVersioningConfiguration is used in XML serialization
|
||||
type internalVersioningConfiguration struct {
|
||||
Type string `xml:"type,attr,omitempty"`
|
||||
Params []internalParam `xml:"param"`
|
||||
CleanupIntervalS int `xml:"cleanupIntervalS" default:"3600"`
|
||||
}
|
||||
|
||||
type InternalParam struct {
|
||||
type internalParam struct {
|
||||
Key string `xml:"key,attr"`
|
||||
Val string `xml:"val,attr"`
|
||||
}
|
||||
@ -35,31 +42,45 @@ func (c VersioningConfiguration) Copy() VersioningConfiguration {
|
||||
return cp
|
||||
}
|
||||
|
||||
func (c *VersioningConfiguration) UnmarshalJSON(data []byte) error {
|
||||
util.SetDefaults(c)
|
||||
type noCustomUnmarshal VersioningConfiguration
|
||||
ptr := (*noCustomUnmarshal)(c)
|
||||
return json.Unmarshal(data, ptr)
|
||||
}
|
||||
|
||||
func (c *VersioningConfiguration) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
||||
var intCfg internalVersioningConfiguration
|
||||
util.SetDefaults(&intCfg)
|
||||
if err := d.DecodeElement(&intCfg, &start); err != nil {
|
||||
return err
|
||||
}
|
||||
c.fromInternal(intCfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *VersioningConfiguration) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
|
||||
var tmp InternalVersioningConfiguration
|
||||
return e.Encode(c.toInternal())
|
||||
}
|
||||
|
||||
func (c *VersioningConfiguration) toInternal() internalVersioningConfiguration {
|
||||
var tmp internalVersioningConfiguration
|
||||
tmp.Type = c.Type
|
||||
tmp.CleanupIntervalS = c.CleanupIntervalS
|
||||
for k, v := range c.Params {
|
||||
tmp.Params = append(tmp.Params, InternalParam{k, v})
|
||||
tmp.Params = append(tmp.Params, internalParam{k, v})
|
||||
}
|
||||
sort.Slice(tmp.Params, func(a, b int) bool {
|
||||
return tmp.Params[a].Key < tmp.Params[b].Key
|
||||
})
|
||||
|
||||
return e.EncodeElement(tmp, start)
|
||||
|
||||
return tmp
|
||||
}
|
||||
|
||||
func (c *VersioningConfiguration) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
||||
var tmp InternalVersioningConfiguration
|
||||
err := d.DecodeElement(&tmp, &start)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.Type = tmp.Type
|
||||
c.Params = make(map[string]string, len(tmp.Params))
|
||||
for _, p := range tmp.Params {
|
||||
func (c *VersioningConfiguration) fromInternal(intCfg internalVersioningConfiguration) {
|
||||
c.Type = intCfg.Type
|
||||
c.CleanupIntervalS = intCfg.CleanupIntervalS
|
||||
c.Params = make(map[string]string, len(intCfg.Params))
|
||||
for _, p := range intCfg.Params {
|
||||
c.Params[p.Key] = p.Val
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"github.com/syncthing/syncthing/lib/stats"
|
||||
"github.com/syncthing/syncthing/lib/sync"
|
||||
"github.com/syncthing/syncthing/lib/util"
|
||||
"github.com/syncthing/syncthing/lib/versioner"
|
||||
"github.com/syncthing/syncthing/lib/watchaggregator"
|
||||
|
||||
"github.com/thejerf/suture"
|
||||
@ -49,12 +50,14 @@ type folder struct {
|
||||
ignores *ignore.Matcher
|
||||
ctx context.Context
|
||||
|
||||
scanInterval time.Duration
|
||||
scanTimer *time.Timer
|
||||
scanDelay chan time.Duration
|
||||
initialScanFinished chan struct{}
|
||||
scanErrors []FileError
|
||||
scanErrorsMut sync.Mutex
|
||||
scanInterval time.Duration
|
||||
scanTimer *time.Timer
|
||||
scanDelay chan time.Duration
|
||||
initialScanFinished chan struct{}
|
||||
scanErrors []FileError
|
||||
scanErrorsMut sync.Mutex
|
||||
versionCleanupInterval time.Duration
|
||||
versionCleanupTimer *time.Timer
|
||||
|
||||
pullScheduled chan struct{}
|
||||
pullPause time.Duration
|
||||
@ -72,7 +75,8 @@ type folder struct {
|
||||
watchErr error
|
||||
watchMut sync.Mutex
|
||||
|
||||
puller puller
|
||||
puller puller
|
||||
versioner versioner.Versioner
|
||||
}
|
||||
|
||||
type syncRequest struct {
|
||||
@ -81,10 +85,10 @@ type syncRequest struct {
|
||||
}
|
||||
|
||||
type puller interface {
|
||||
pull() bool // true when successfull and should not be retried
|
||||
pull() bool // true when successful and should not be retried
|
||||
}
|
||||
|
||||
func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *byteSemaphore) folder {
|
||||
func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *byteSemaphore, ver versioner.Versioner) folder {
|
||||
f := folder{
|
||||
stateTracker: newStateTracker(cfg.ID, evLogger),
|
||||
FolderConfiguration: cfg,
|
||||
@ -96,11 +100,13 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
|
||||
fset: fset,
|
||||
ignores: ignores,
|
||||
|
||||
scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
|
||||
scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
|
||||
scanDelay: make(chan time.Duration),
|
||||
initialScanFinished: make(chan struct{}),
|
||||
scanErrorsMut: sync.NewMutex(),
|
||||
scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
|
||||
scanTimer: time.NewTimer(0), // The first scan should be done immediately.
|
||||
scanDelay: make(chan time.Duration),
|
||||
initialScanFinished: make(chan struct{}),
|
||||
scanErrorsMut: sync.NewMutex(),
|
||||
versionCleanupInterval: time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second,
|
||||
versionCleanupTimer: time.NewTimer(time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second),
|
||||
|
||||
pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
|
||||
|
||||
@ -113,6 +119,8 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
|
||||
watchCancel: func() {},
|
||||
restartWatchChan: make(chan struct{}, 1),
|
||||
watchMut: sync.NewMutex(),
|
||||
|
||||
versioner: ver,
|
||||
}
|
||||
f.pullPause = f.pullBasePause()
|
||||
f.pullFailTimer = time.NewTimer(0)
|
||||
@ -131,6 +139,7 @@ func (f *folder) serve(ctx context.Context) {
|
||||
|
||||
defer func() {
|
||||
f.scanTimer.Stop()
|
||||
f.versionCleanupTimer.Stop()
|
||||
f.setState(FolderIdle)
|
||||
}()
|
||||
|
||||
@ -138,6 +147,14 @@ func (f *folder) serve(ctx context.Context) {
|
||||
f.startWatch()
|
||||
}
|
||||
|
||||
// If we're configured to not do version cleanup, or we don't have a
|
||||
// versioner, cancel and drain that timer now.
|
||||
if f.versionCleanupInterval == 0 || f.versioner == nil {
|
||||
if !f.versionCleanupTimer.Stop() {
|
||||
<-f.versionCleanupTimer.C
|
||||
}
|
||||
}
|
||||
|
||||
initialCompleted := f.initialScanFinished
|
||||
|
||||
for {
|
||||
@ -181,6 +198,10 @@ func (f *folder) serve(ctx context.Context) {
|
||||
case <-f.restartWatchChan:
|
||||
l.Debugln(f, "Restart watcher")
|
||||
f.restartWatch()
|
||||
|
||||
case <-f.versionCleanupTimer.C:
|
||||
l.Debugln(f, "Doing version cleanup")
|
||||
f.versionCleanupTimerFired()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -701,6 +722,24 @@ func (f *folder) scanTimerFired() {
|
||||
f.Reschedule()
|
||||
}
|
||||
|
||||
func (f *folder) versionCleanupTimerFired() {
|
||||
f.setState(FolderCleanWaiting)
|
||||
defer f.setState(FolderIdle)
|
||||
|
||||
if err := f.ioLimiter.takeWithContext(f.ctx, 1); err != nil {
|
||||
return
|
||||
}
|
||||
defer f.ioLimiter.give(1)
|
||||
|
||||
f.setState(FolderCleaning)
|
||||
|
||||
if err := f.versioner.Clean(f.ctx); err != nil {
|
||||
l.Infoln("Failed to clean versions in %s: %v", f.Description(), err)
|
||||
}
|
||||
|
||||
f.versionCleanupTimer.Reset(f.versionCleanupInterval)
|
||||
}
|
||||
|
||||
func (f *folder) WatchError() error {
|
||||
f.watchMut.Lock()
|
||||
defer f.watchMut.Unlock()
|
||||
|
@ -27,7 +27,7 @@ type sendOnlyFolder struct {
|
||||
|
||||
func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
|
||||
f := &sendOnlyFolder{
|
||||
folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter),
|
||||
folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter, nil),
|
||||
}
|
||||
f.folder.puller = f
|
||||
f.folder.Service = util.AsService(f.serve, f.String())
|
||||
|
@ -123,8 +123,7 @@ type dbUpdateJob struct {
|
||||
type sendReceiveFolder struct {
|
||||
folder
|
||||
|
||||
fs fs.Filesystem
|
||||
versioner versioner.Versioner
|
||||
fs fs.Filesystem
|
||||
|
||||
queue *jobQueue
|
||||
blockPullReorderer blockPullReorderer
|
||||
@ -137,9 +136,8 @@ type sendReceiveFolder struct {
|
||||
|
||||
func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
|
||||
f := &sendReceiveFolder{
|
||||
folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter),
|
||||
folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter, ver),
|
||||
fs: fs,
|
||||
versioner: ver,
|
||||
queue: newJobQueue(),
|
||||
blockPullReorderer: newBlockPullReorderer(cfg.BlockPullOrder, model.id, cfg.DeviceIDs()),
|
||||
writeLimiter: newByteSemaphore(cfg.MaxConcurrentWrites),
|
||||
|
@ -22,6 +22,8 @@ const (
|
||||
FolderSyncWaiting
|
||||
FolderSyncPreparing
|
||||
FolderSyncing
|
||||
FolderCleaning
|
||||
FolderCleanWaiting
|
||||
FolderError
|
||||
)
|
||||
|
||||
@ -39,6 +41,10 @@ func (s folderState) String() string {
|
||||
return "sync-preparing"
|
||||
case FolderSyncing:
|
||||
return "syncing"
|
||||
case FolderCleaning:
|
||||
return "cleaning"
|
||||
case FolderCleanWaiting:
|
||||
return "clean-waiting"
|
||||
case FolderError:
|
||||
return "error"
|
||||
default:
|
||||
|
@ -133,15 +133,15 @@ type model struct {
|
||||
folderIOLimiter *byteSemaphore
|
||||
|
||||
// fields protected by fmut
|
||||
fmut sync.RWMutex
|
||||
folderCfgs map[string]config.FolderConfiguration // folder -> cfg
|
||||
folderFiles map[string]*db.FileSet // folder -> files
|
||||
deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef
|
||||
folderIgnores map[string]*ignore.Matcher // folder -> matcher object
|
||||
folderRunners map[string]service // folder -> puller or scanner
|
||||
folderRunnerTokens map[string][]suture.ServiceToken // folder -> tokens for puller or scanner
|
||||
folderRestartMuts syncMutexMap // folder -> restart mutex
|
||||
folderVersioners map[string]versioner.Versioner // folder -> versioner (may be nil)
|
||||
fmut sync.RWMutex
|
||||
folderCfgs map[string]config.FolderConfiguration // folder -> cfg
|
||||
folderFiles map[string]*db.FileSet // folder -> files
|
||||
deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef
|
||||
folderIgnores map[string]*ignore.Matcher // folder -> matcher object
|
||||
folderRunners map[string]service // folder -> puller or scanner
|
||||
folderRunnerToken map[string]suture.ServiceToken // folder -> token for folder runner
|
||||
folderRestartMuts syncMutexMap // folder -> restart mutex
|
||||
folderVersioners map[string]versioner.Versioner // folder -> versioner (may be nil)
|
||||
|
||||
// fields protected by pmut
|
||||
pmut sync.RWMutex
|
||||
@ -207,14 +207,14 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
|
||||
folderIOLimiter: newByteSemaphore(cfg.Options().MaxFolderConcurrency()),
|
||||
|
||||
// fields protected by fmut
|
||||
fmut: sync.NewRWMutex(),
|
||||
folderCfgs: make(map[string]config.FolderConfiguration),
|
||||
folderFiles: make(map[string]*db.FileSet),
|
||||
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
|
||||
folderIgnores: make(map[string]*ignore.Matcher),
|
||||
folderRunners: make(map[string]service),
|
||||
folderRunnerTokens: make(map[string][]suture.ServiceToken),
|
||||
folderVersioners: make(map[string]versioner.Versioner),
|
||||
fmut: sync.NewRWMutex(),
|
||||
folderCfgs: make(map[string]config.FolderConfiguration),
|
||||
folderFiles: make(map[string]*db.FileSet),
|
||||
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
|
||||
folderIgnores: make(map[string]*ignore.Matcher),
|
||||
folderRunners: make(map[string]service),
|
||||
folderRunnerToken: make(map[string]suture.ServiceToken),
|
||||
folderVersioners: make(map[string]versioner.Versioner),
|
||||
|
||||
// fields protected by pmut
|
||||
pmut: sync.NewRWMutex(),
|
||||
@ -346,13 +346,6 @@ func (m *model) addAndStartFolderLockedWithIgnores(cfg config.FolderConfiguratio
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("creating versioner: %w", err))
|
||||
}
|
||||
if service, ok := ver.(suture.Service); ok {
|
||||
// The versioner implements the suture.Service interface, so
|
||||
// expects to be run in the background in addition to being called
|
||||
// when files are going to be archived.
|
||||
token := m.Add(service)
|
||||
m.folderRunnerTokens[folder] = append(m.folderRunnerTokens[folder], token)
|
||||
}
|
||||
}
|
||||
m.folderVersioners[folder] = ver
|
||||
|
||||
@ -362,8 +355,7 @@ func (m *model) addAndStartFolderLockedWithIgnores(cfg config.FolderConfiguratio
|
||||
|
||||
m.warnAboutOverwritingProtectedFiles(cfg, ignores)
|
||||
|
||||
token := m.Add(p)
|
||||
m.folderRunnerTokens[folder] = append(m.folderRunnerTokens[folder], token)
|
||||
m.folderRunnerToken[folder] = m.Add(p)
|
||||
|
||||
l.Infof("Ready to synchronize %s (%s)", cfg.Description(), cfg.Type)
|
||||
}
|
||||
@ -430,11 +422,11 @@ func (m *model) stopFolder(cfg config.FolderConfiguration, err error) {
|
||||
// Stop the services running for this folder and wait for them to finish
|
||||
// stopping to prevent races on restart.
|
||||
m.fmut.RLock()
|
||||
tokens := m.folderRunnerTokens[cfg.ID]
|
||||
token, ok := m.folderRunnerToken[cfg.ID]
|
||||
m.fmut.RUnlock()
|
||||
|
||||
for _, id := range tokens {
|
||||
m.RemoveAndWait(id, 0)
|
||||
if ok {
|
||||
m.RemoveAndWait(token, 0)
|
||||
}
|
||||
|
||||
// Wait for connections to stop to ensure that no more calls to methods
|
||||
@ -449,7 +441,7 @@ func (m *model) cleanupFolderLocked(cfg config.FolderConfiguration) {
|
||||
delete(m.folderFiles, cfg.ID)
|
||||
delete(m.folderIgnores, cfg.ID)
|
||||
delete(m.folderRunners, cfg.ID)
|
||||
delete(m.folderRunnerTokens, cfg.ID)
|
||||
delete(m.folderRunnerToken, cfg.ID)
|
||||
delete(m.folderVersioners, cfg.ID)
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
package versioner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"os/exec"
|
||||
@ -115,3 +116,7 @@ func (v external) GetVersions() (map[string][]FileVersion, error) {
|
||||
func (v external) Restore(filePath string, versionTime time.Time) error {
|
||||
return ErrRestorationNotSupported
|
||||
}
|
||||
|
||||
func (v external) Clean(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -7,6 +7,7 @@
|
||||
package versioner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@ -73,3 +74,7 @@ func (v simple) GetVersions() (map[string][]FileVersion, error) {
|
||||
func (v simple) Restore(filepath string, versionTime time.Time) error {
|
||||
return restoreFile(v.copyRangeMethod, v.versionsFs, v.folderFs, filepath, versionTime, TagFilename)
|
||||
}
|
||||
|
||||
func (v simple) Clean(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -13,12 +13,8 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/thejerf/suture"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/config"
|
||||
"github.com/syncthing/syncthing/lib/fs"
|
||||
"github.com/syncthing/syncthing/lib/sync"
|
||||
"github.com/syncthing/syncthing/lib/util"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -32,15 +28,10 @@ type interval struct {
|
||||
}
|
||||
|
||||
type staggered struct {
|
||||
suture.Service
|
||||
cleanInterval int64
|
||||
folderFs fs.Filesystem
|
||||
versionsFs fs.Filesystem
|
||||
interval [4]interval
|
||||
copyRangeMethod fs.CopyRangeMethod
|
||||
mutex sync.Mutex
|
||||
|
||||
testCleanDone chan struct{}
|
||||
}
|
||||
|
||||
func newStaggered(cfg config.FolderConfiguration) Versioner {
|
||||
@ -49,19 +40,14 @@ func newStaggered(cfg config.FolderConfiguration) Versioner {
|
||||
if err != nil {
|
||||
maxAge = 31536000 // Default: ~1 year
|
||||
}
|
||||
cleanInterval, err := strconv.ParseInt(params["cleanInterval"], 10, 0)
|
||||
if err != nil {
|
||||
cleanInterval = 3600 // Default: clean once per hour
|
||||
}
|
||||
|
||||
// Backwards compatibility
|
||||
params["fsPath"] = params["versionsPath"]
|
||||
versionsFs := versionerFsFromFolderCfg(cfg)
|
||||
|
||||
s := &staggered{
|
||||
cleanInterval: cleanInterval,
|
||||
folderFs: cfg.Filesystem(),
|
||||
versionsFs: versionsFs,
|
||||
folderFs: cfg.Filesystem(),
|
||||
versionsFs: versionsFs,
|
||||
interval: [4]interval{
|
||||
{30, 60 * 60}, // first hour -> 30 sec between versions
|
||||
{60 * 60, 24 * 60 * 60}, // next day -> 1 h between versions
|
||||
@ -69,41 +55,18 @@ func newStaggered(cfg config.FolderConfiguration) Versioner {
|
||||
{7 * 24 * 60 * 60, maxAge}, // next year -> 1 week between versions
|
||||
},
|
||||
copyRangeMethod: cfg.CopyRangeMethod,
|
||||
mutex: sync.NewMutex(),
|
||||
}
|
||||
s.Service = util.AsService(s.serve, s.String())
|
||||
|
||||
l.Debugf("instantiated %#v", s)
|
||||
return s
|
||||
}
|
||||
|
||||
func (v *staggered) serve(ctx context.Context) {
|
||||
v.clean()
|
||||
if v.testCleanDone != nil {
|
||||
close(v.testCleanDone)
|
||||
}
|
||||
|
||||
tck := time.NewTicker(time.Duration(v.cleanInterval) * time.Second)
|
||||
defer tck.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-tck.C:
|
||||
v.clean()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (v *staggered) clean() {
|
||||
l.Debugln("Versioner clean: Waiting for lock on", v.versionsFs)
|
||||
v.mutex.Lock()
|
||||
defer v.mutex.Unlock()
|
||||
func (v *staggered) Clean(ctx context.Context) error {
|
||||
l.Debugln("Versioner clean: Cleaning", v.versionsFs)
|
||||
|
||||
if _, err := v.versionsFs.Stat("."); fs.IsNotExist(err) {
|
||||
// There is no need to clean a nonexistent dir.
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
versionsPerFile := make(map[string][]string)
|
||||
@ -113,6 +76,11 @@ func (v *staggered) clean() {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if f.IsDir() && !f.IsSymlink() {
|
||||
dirTracker.addDir(path)
|
||||
@ -134,16 +102,22 @@ func (v *staggered) clean() {
|
||||
|
||||
if err := v.versionsFs.Walk(".", walkFn); err != nil {
|
||||
l.Warnln("Versioner: error scanning versions dir", err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
for _, versionList := range versionsPerFile {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
v.expire(versionList)
|
||||
}
|
||||
|
||||
dirTracker.deleteEmptyDirs(v.versionsFs)
|
||||
|
||||
l.Debugln("Cleaner: Finished cleaning", v.versionsFs)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *staggered) expire(versions []string) {
|
||||
@ -216,10 +190,6 @@ func (v *staggered) toRemove(versions []string, now time.Time) []string {
|
||||
// Archive moves the named file away to a version archive. If this function
|
||||
// returns nil, the named file does not exist any more (has been archived).
|
||||
func (v *staggered) Archive(filePath string) error {
|
||||
l.Debugln("Waiting for lock on ", v.versionsFs)
|
||||
v.mutex.Lock()
|
||||
defer v.mutex.Unlock()
|
||||
|
||||
if err := archiveFile(v.copyRangeMethod, v.folderFs, v.versionsFs, filePath, TagFilename); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -12,11 +12,8 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/thejerf/suture"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/config"
|
||||
"github.com/syncthing/syncthing/lib/fs"
|
||||
"github.com/syncthing/syncthing/lib/util"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -25,7 +22,6 @@ func init() {
|
||||
}
|
||||
|
||||
type trashcan struct {
|
||||
suture.Service
|
||||
folderFs fs.Filesystem
|
||||
versionsFs fs.Filesystem
|
||||
cleanoutDays int
|
||||
@ -42,7 +38,6 @@ func newTrashcan(cfg config.FolderConfiguration) Versioner {
|
||||
cleanoutDays: cleanoutDays,
|
||||
copyRangeMethod: cfg.CopyRangeMethod,
|
||||
}
|
||||
s.Service = util.AsService(s.serve, s.String())
|
||||
|
||||
l.Debugf("instantiated %#v", s)
|
||||
return s
|
||||
@ -56,37 +51,16 @@ func (t *trashcan) Archive(filePath string) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trashcan) serve(ctx context.Context) {
|
||||
l.Debugln(t, "starting")
|
||||
defer l.Debugln(t, "stopping")
|
||||
|
||||
// Do the first cleanup one minute after startup.
|
||||
timer := time.NewTimer(time.Minute)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-timer.C:
|
||||
if t.cleanoutDays > 0 {
|
||||
if err := t.cleanoutArchive(); err != nil {
|
||||
l.Infoln("Cleaning trashcan:", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanups once a day should be enough.
|
||||
timer.Reset(24 * time.Hour)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *trashcan) String() string {
|
||||
return fmt.Sprintf("trashcan@%p", t)
|
||||
}
|
||||
|
||||
func (t *trashcan) cleanoutArchive() error {
|
||||
func (t *trashcan) Clean(ctx context.Context) error {
|
||||
if t.cleanoutDays <= 0 {
|
||||
// no cleanout requested
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := t.versionsFs.Lstat("."); fs.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
@ -98,6 +72,11 @@ func (t *trashcan) cleanoutArchive() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if info.IsDir() && !info.IsSymlink() {
|
||||
dirTracker.addDir(path)
|
||||
|
@ -7,13 +7,14 @@
|
||||
package versioner
|
||||
|
||||
import (
|
||||
"github.com/syncthing/syncthing/lib/config"
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/config"
|
||||
"github.com/syncthing/syncthing/lib/fs"
|
||||
)
|
||||
|
||||
@ -65,7 +66,7 @@ func TestTrashcanCleanout(t *testing.T) {
|
||||
}
|
||||
|
||||
versioner := newTrashcan(cfg).(*trashcan)
|
||||
if err := versioner.cleanoutArchive(); err != nil {
|
||||
if err := versioner.Clean(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
package versioner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
@ -20,6 +21,7 @@ type Versioner interface {
|
||||
Archive(filePath string) error
|
||||
GetVersions() (map[string][]FileVersion, error)
|
||||
Restore(filePath string, versionTime time.Time) error
|
||||
Clean(context.Context) error
|
||||
}
|
||||
|
||||
type FileVersion struct {
|
||||
|
Loading…
Reference in New Issue
Block a user