Merge branch 'pr-1748'

* pr-1748:
  Reschedule before scan
  Use a channel instead of locks
  Reschedule the next scan interval (fixes #1591)
This commit is contained in:
Jakob Borg 2015-05-04 10:40:14 +02:00
commit ff4706e450
7 changed files with 174 additions and 28 deletions

View File

@ -155,7 +155,7 @@ func (s *apiSvc) Serve() {
postRestMux.HandleFunc("/rest/db/prio", s.postDBPrio) // folder file [perpage] [page] postRestMux.HandleFunc("/rest/db/prio", s.postDBPrio) // folder file [perpage] [page]
postRestMux.HandleFunc("/rest/db/ignores", s.postDBIgnores) // folder postRestMux.HandleFunc("/rest/db/ignores", s.postDBIgnores) // folder
postRestMux.HandleFunc("/rest/db/override", s.postDBOverride) // folder postRestMux.HandleFunc("/rest/db/override", s.postDBOverride) // folder
postRestMux.HandleFunc("/rest/db/scan", s.postDBScan) // folder [sub...] postRestMux.HandleFunc("/rest/db/scan", s.postDBScan) // folder [sub...] [delay]
postRestMux.HandleFunc("/rest/system/config", s.postSystemConfig) // <body> postRestMux.HandleFunc("/rest/system/config", s.postSystemConfig) // <body>
postRestMux.HandleFunc("/rest/system/discovery", s.postSystemDiscovery) // device addr postRestMux.HandleFunc("/rest/system/discovery", s.postSystemDiscovery) // device addr
postRestMux.HandleFunc("/rest/system/error", s.postSystemError) // <body> postRestMux.HandleFunc("/rest/system/error", s.postSystemError) // <body>
@ -775,16 +775,24 @@ func (s *apiSvc) postDBScan(w http.ResponseWriter, r *http.Request) {
qs := r.URL.Query() qs := r.URL.Query()
folder := qs.Get("folder") folder := qs.Get("folder")
if folder != "" { if folder != "" {
nextStr := qs.Get("next")
next, err := strconv.Atoi(nextStr)
if err == nil {
s.model.DelayScan(folder, time.Duration(next)*time.Second)
}
subs := qs["sub"] subs := qs["sub"]
err := s.model.ScanFolderSubs(folder, subs) err = s.model.ScanFolderSubs(folder, subs)
if err != nil { if err != nil {
http.Error(w, err.Error(), 500) http.Error(w, err.Error(), 500)
return
} }
} else { } else {
errors := s.model.ScanFolders() errors := s.model.ScanFolders()
if len(errors) > 0 { if len(errors) > 0 {
http.Error(w, "Error scanning folders", 500) http.Error(w, "Error scanning folders", 500)
json.NewEncoder(w).Encode(errors) json.NewEncoder(w).Encode(errors)
return
} }
} }
} }

Binary file not shown.

View File

@ -49,6 +49,7 @@ type service interface {
Stop() Stop()
Jobs() ([]string, []string) // In progress, Queued Jobs() ([]string, []string) // In progress, Queued
BringToFront(string) BringToFront(string)
DelayScan(d time.Duration)
setState(state folderState) setState(state folderState)
setError(err error) setError(err error)
@ -1322,6 +1323,16 @@ nextSub:
return nil return nil
} }
func (m *Model) DelayScan(folder string, next time.Duration) {
m.fmut.Lock()
runner, ok := m.folderRunners[folder]
m.fmut.Unlock()
if !ok {
return
}
runner.DelayScan(next)
}
// numHashers returns the number of hasher routines to use for a given folder, // numHashers returns the number of hasher routines to use for a given folder,
// taking into account configuration and available CPU cores. // taking into account configuration and available CPU cores.
func (m *Model) numHashers(folder string) int { func (m *Model) numHashers(folder string) int {

View File

@ -17,10 +17,12 @@ import (
type roFolder struct { type roFolder struct {
stateTracker stateTracker
folder string folder string
intv time.Duration intv time.Duration
model *Model timer *time.Timer
stop chan struct{} model *Model
stop chan struct{}
delayScan chan time.Duration
} }
func newROFolder(model *Model, folder string, interval time.Duration) *roFolder { func newROFolder(model *Model, folder string, interval time.Duration) *roFolder {
@ -29,10 +31,12 @@ func newROFolder(model *Model, folder string, interval time.Duration) *roFolder
folder: folder, folder: folder,
mut: sync.NewMutex(), mut: sync.NewMutex(),
}, },
folder: folder, folder: folder,
intv: interval, intv: interval,
model: model, timer: time.NewTimer(time.Millisecond),
stop: make(chan struct{}), model: model,
stop: make(chan struct{}),
delayScan: make(chan time.Duration),
} }
} }
@ -42,13 +46,14 @@ func (s *roFolder) Serve() {
defer l.Debugln(s, "exiting") defer l.Debugln(s, "exiting")
} }
timer := time.NewTimer(time.Millisecond) defer func() {
defer timer.Stop() s.timer.Stop()
}()
reschedule := func() { reschedule := func() {
// Sleep a random time between 3/4 and 5/4 of the configured interval. // Sleep a random time between 3/4 and 5/4 of the configured interval.
sleepNanos := (s.intv.Nanoseconds()*3 + rand.Int63n(2*s.intv.Nanoseconds())) / 4 sleepNanos := (s.intv.Nanoseconds()*3 + rand.Int63n(2*s.intv.Nanoseconds())) / 4
timer.Reset(time.Duration(sleepNanos) * time.Nanosecond) s.timer.Reset(time.Duration(sleepNanos) * time.Nanosecond)
} }
initialScanCompleted := false initialScanCompleted := false
@ -57,7 +62,7 @@ func (s *roFolder) Serve() {
case <-s.stop: case <-s.stop:
return return
case <-timer.C: case <-s.timer.C:
if err := s.model.CheckFolderHealth(s.folder); err != nil { if err := s.model.CheckFolderHealth(s.folder); err != nil {
l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err) l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err)
reschedule() reschedule()
@ -88,6 +93,9 @@ func (s *roFolder) Serve() {
} }
reschedule() reschedule()
case next := <-s.delayScan:
s.timer.Reset(next)
} }
} }
} }
@ -105,3 +113,7 @@ func (s *roFolder) BringToFront(string) {}
func (s *roFolder) Jobs() ([]string, []string) { func (s *roFolder) Jobs() ([]string, []string) {
return nil, nil return nil, nil
} }
func (s *roFolder) DelayScan(next time.Duration) {
s.delayScan <- next
}

View File

@ -74,6 +74,9 @@ type rwFolder struct {
stop chan struct{} stop chan struct{}
queue *jobQueue queue *jobQueue
dbUpdates chan protocol.FileInfo dbUpdates chan protocol.FileInfo
scanTimer *time.Timer
pullTimer *time.Timer
delayScan chan time.Duration
} }
func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder { func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder {
@ -96,8 +99,11 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
shortID: shortID, shortID: shortID,
order: cfg.Order, order: cfg.Order,
stop: make(chan struct{}), stop: make(chan struct{}),
queue: newJobQueue(), queue: newJobQueue(),
pullTimer: time.NewTimer(checkPullIntv),
scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
delayScan: make(chan time.Duration),
} }
} }
@ -109,12 +115,9 @@ func (p *rwFolder) Serve() {
defer l.Debugln(p, "exiting") defer l.Debugln(p, "exiting")
} }
pullTimer := time.NewTimer(checkPullIntv)
scanTimer := time.NewTimer(time.Millisecond) // The first scan should be done immediately.
defer func() { defer func() {
pullTimer.Stop() p.pullTimer.Stop()
scanTimer.Stop() p.scanTimer.Stop()
// TODO: Should there be an actual FolderStopped state? // TODO: Should there be an actual FolderStopped state?
p.setState(FolderIdle) p.setState(FolderIdle)
}() }()
@ -135,7 +138,7 @@ func (p *rwFolder) Serve() {
if debug { if debug {
l.Debugln(p, "next rescan in", intv) l.Debugln(p, "next rescan in", intv)
} }
scanTimer.Reset(intv) p.scanTimer.Reset(intv)
} }
// We don't start pulling files until a scan has been completed. // We don't start pulling files until a scan has been completed.
@ -151,12 +154,12 @@ func (p *rwFolder) Serve() {
// information is available. Before that though, I'd like to build a // information is available. Before that though, I'd like to build a
// repeatable benchmark of how long it takes to sync a change from // repeatable benchmark of how long it takes to sync a change from
// device A to device B, so we have something to work against. // device A to device B, so we have something to work against.
case <-pullTimer.C: case <-p.pullTimer.C:
if !initialScanCompleted { if !initialScanCompleted {
if debug { if debug {
l.Debugln(p, "skip (initial)") l.Debugln(p, "skip (initial)")
} }
pullTimer.Reset(nextPullIntv) p.pullTimer.Reset(nextPullIntv)
continue continue
} }
@ -180,7 +183,7 @@ func (p *rwFolder) Serve() {
if debug { if debug {
l.Debugln(p, "skip (curVer == prevVer)", prevVer) l.Debugln(p, "skip (curVer == prevVer)", prevVer)
} }
pullTimer.Reset(checkPullIntv) p.pullTimer.Reset(checkPullIntv)
continue continue
} }
@ -218,7 +221,7 @@ func (p *rwFolder) Serve() {
if debug { if debug {
l.Debugln(p, "next pull in", nextPullIntv) l.Debugln(p, "next pull in", nextPullIntv)
} }
pullTimer.Reset(nextPullIntv) p.pullTimer.Reset(nextPullIntv)
break break
} }
@ -231,7 +234,7 @@ func (p *rwFolder) Serve() {
if debug { if debug {
l.Debugln(p, "next pull in", pauseIntv) l.Debugln(p, "next pull in", pauseIntv)
} }
pullTimer.Reset(pauseIntv) p.pullTimer.Reset(pauseIntv)
break break
} }
} }
@ -240,7 +243,7 @@ func (p *rwFolder) Serve() {
// The reason for running the scanner from within the puller is that // The reason for running the scanner from within the puller is that
// this is the easiest way to make sure we are not doing both at the // this is the easiest way to make sure we are not doing both at the
// same time. // same time.
case <-scanTimer.C: case <-p.scanTimer.C:
if err := p.model.CheckFolderHealth(p.folder); err != nil { if err := p.model.CheckFolderHealth(p.folder); err != nil {
l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err) l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
rescheduleScan() rescheduleScan()
@ -268,6 +271,9 @@ func (p *rwFolder) Serve() {
l.Infoln("Completed initial scan (rw) of folder", p.folder) l.Infoln("Completed initial scan (rw) of folder", p.folder)
initialScanCompleted = true initialScanCompleted = true
} }
case next := <-p.delayScan:
p.scanTimer.Reset(next)
} }
} }
} }
@ -1165,6 +1171,10 @@ func (p *rwFolder) Jobs() ([]string, []string) {
return p.queue.Jobs() return p.queue.Jobs()
} }
func (p *rwFolder) DelayScan(next time.Duration) {
p.delayScan <- next
}
// dbUpdaterRoutine aggregates db updates and commits them in batches no // dbUpdaterRoutine aggregates db updates and commits them in batches no
// larger than 1000 items, and no more delayed than 2 seconds. // larger than 1000 items, and no more delayed than 2 seconds.
func (p *rwFolder) dbUpdaterRoutine() { func (p *rwFolder) dbUpdaterRoutine() {

91
test/delay_scan_test.go Normal file
View File

@ -0,0 +1,91 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build integration
package integration
import (
"io/ioutil"
"log"
"sync"
"testing"
"time"
)
func TestDelayScan(t *testing.T) {
log.Println("Cleaning...")
err := removeAll("s1", "h1/index*")
if err != nil {
t.Fatal(err)
}
log.Println("Generating files...")
err = generateFiles("s1", 50, 18, "../LICENSE")
if err != nil {
t.Fatal(err)
}
log.Println("Generating .stignore...")
err = ioutil.WriteFile("s1/.stignore", []byte("some ignore data\n"), 0644)
if err != nil {
t.Fatal(err)
}
log.Println("Starting up...")
st := syncthingProcess{ // id1
instance: "1",
argv: []string{"-home", "h1"},
port: 8081,
apiKey: apiKey,
}
err = st.start()
if err != nil {
t.Fatal(err)
}
// Wait for one scan to succeed, or up to 20 seconds...
// This is to let startup, UPnP etc complete.
for i := 0; i < 20; i++ {
err := st.rescan("default")
if err != nil {
time.Sleep(time.Second)
continue
}
break
}
// Wait for UPnP and stuff
time.Sleep(10 * time.Second)
var wg sync.WaitGroup
log.Println("Starting scans...")
for j := 0; j < 20; j++ {
j := j
wg.Add(1)
go func() {
defer wg.Done()
err := st.rescanNext("default", time.Duration(1)*time.Second)
log.Println(j)
if err != nil {
log.Println(err)
t.Fatal(err)
}
}()
}
wg.Wait()
log.Println("Scans done")
time.Sleep(2 * time.Second)
// This is where the real test is currently, since stop() checks for data
// race output in the log.
log.Println("Stopping...")
_, err = st.stop()
if err != nil {
t.Fatal(err)
}
}

View File

@ -20,6 +20,7 @@ import (
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
"strconv"
"time" "time"
"github.com/syncthing/protocol" "github.com/syncthing/protocol"
@ -322,6 +323,19 @@ func (p *syncthingProcess) rescan(folder string) error {
return nil return nil
} }
func (p *syncthingProcess) rescanNext(folder string, next time.Duration) error {
resp, err := p.post("/rest/db/scan?folder="+folder+"&next="+strconv.Itoa(int(next.Seconds())), nil)
if err != nil {
return err
}
data, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("Rescan %q: status code %d: %s", folder, resp.StatusCode, data)
}
return nil
}
func (p *syncthingProcess) reset(folder string) error { func (p *syncthingProcess) reset(folder string) error {
resp, err := p.post("/rest/system/reset?folder="+folder, nil) resp, err := p.post("/rest/system/reset?folder="+folder, nil)
if err != nil { if err != nil {