From 73f9c7d174d04b06a0f2c3a9d4e46f999108c9fe Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 7 Feb 2017 14:31:13 +0100 Subject: [PATCH] lib/versioner: Convert Staggered to a suture.Service (fixes #3820) --- lib/versioner/staggered.go | 52 ++++++++++++++++++++------------- lib/versioner/staggered_test.go | 9 ++++-- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/lib/versioner/staggered.go b/lib/versioner/staggered.go index bfe8099bb..0e346c70d 100644 --- a/lib/versioner/staggered.go +++ b/lib/versioner/staggered.go @@ -33,9 +33,10 @@ type Staggered struct { folderPath string interval [4]Interval mutex sync.Mutex -} -var testCleanDone chan struct{} + stop chan struct{} + testCleanDone chan struct{} +} func NewStaggered(folderID, folderPath string, params map[string]string) Versioner { maxAge, err := strconv.ParseInt(params["maxAge"], 10, 0) @@ -57,7 +58,7 @@ func NewStaggered(folderID, folderPath string, params map[string]string) Version versionsDir = params["versionsPath"] } - s := Staggered{ + s := &Staggered{ versionsPath: versionsDir, cleanInterval: cleanInterval, folderPath: folderPath, @@ -68,27 +69,36 @@ func NewStaggered(folderID, folderPath string, params map[string]string) Version {604800, maxAge}, // next year -> 1 week between versions }, mutex: sync.NewMutex(), + stop: make(chan struct{}), } l.Debugf("instantiated %#v", s) - - go func() { - // TODO: This should be converted to a Serve() method. - s.clean() - if testCleanDone != nil { - close(testCleanDone) - } - tck := time.NewTicker(time.Duration(cleanInterval) * time.Second) - defer tck.Stop() - for range tck.C { - s.clean() - } - }() - return s } -func (v Staggered) clean() { +func (v *Staggered) Serve() { + v.clean() + if v.testCleanDone != nil { + close(v.testCleanDone) + } + + tck := time.NewTicker(time.Duration(v.cleanInterval) * time.Second) + defer tck.Stop() + for { + select { + case <-tck.C: + v.clean() + case <-v.stop: + return + } + } +} + +func (v *Staggered) Stop() { + close(v.stop) +} + +func (v *Staggered) clean() { l.Debugln("Versioner clean: Waiting for lock on", v.versionsPath) v.mutex.Lock() defer v.mutex.Unlock() @@ -157,7 +167,7 @@ func (v Staggered) clean() { l.Debugln("Cleaner: Finished cleaning", v.versionsPath) } -func (v Staggered) expire(versions []string) { +func (v *Staggered) expire(versions []string) { l.Debugln("Versioner: Expiring versions", versions) for _, file := range v.toRemove(versions, time.Now()) { if fi, err := osutil.Lstat(file); err != nil { @@ -174,7 +184,7 @@ func (v Staggered) expire(versions []string) { } } -func (v Staggered) toRemove(versions []string, now time.Time) []string { +func (v *Staggered) toRemove(versions []string, now time.Time) []string { var prevAge int64 firstFile := true var remove []string @@ -226,7 +236,7 @@ func (v Staggered) toRemove(versions []string, now time.Time) []string { // Archive moves the named file away to a version archive. If this function // returns nil, the named file does not exist any more (has been archived). -func (v Staggered) Archive(filePath string) error { +func (v *Staggered) Archive(filePath string) error { l.Debugln("Waiting for lock on ", v.versionsPath) v.mutex.Lock() defer v.mutex.Unlock() diff --git a/lib/versioner/staggered_test.go b/lib/versioner/staggered_test.go index f3c7d957c..da4235425 100644 --- a/lib/versioner/staggered_test.go +++ b/lib/versioner/staggered_test.go @@ -62,9 +62,12 @@ func TestStaggeredVersioningVersionCount(t *testing.T) { os.MkdirAll("testdata/.stversions", 0755) defer os.RemoveAll("testdata") - testCleanDone = make(chan struct{}) - v := NewStaggered("", "testdata", map[string]string{"maxAge": strconv.Itoa(365 * 86400)}).(Staggered) - <-testCleanDone + v := NewStaggered("", "testdata", map[string]string{"maxAge": strconv.Itoa(365 * 86400)}).(*Staggered) + v.testCleanDone = make(chan struct{}) + defer v.Stop() + go v.Serve() + + <-v.testCleanDone rem := v.toRemove(files, now) if diff, equal := messagediff.PrettyDiff(delete, rem); !equal {