mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-14 01:04:14 +00:00
* lib/model, cmd/syncthing: Wait for folder restarts to complete (fixes #5233) This is the somewhat ugly - but on the other hand clear - fix for what is really a somewhat thorny issue. To avoid zombie folder runners a new mutex is introduced that protects the RestartFolder operation. I hate adding more mutexes but the alternatives I can think of are worse. The other part of it is that the POST /rest/system/config operation now waits for the config commit to complete. The point of this is that until the commit has completed we should not accept another config commit. If we did, we could end up with two separate RestartFolders queued in the background. While they are both correct, and will run without interfering with each other, we can't guarantee the order in which they will run. Thus it could happen that the newer config got committed first, and the older config commited after that, leaving us with the wrong config running. * test * wip * hax * hax * unflake test * per folder mutexes * paranoia * race
This commit is contained in:
parent
c9d6366d75
commit
f528923d1e
@ -898,12 +898,15 @@ func (s *apiService) postSystemConfig(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Activate and save
|
// Activate and save. Wait for the configuration to become active before
|
||||||
|
// completing the request.
|
||||||
|
|
||||||
if _, err := s.cfg.Replace(to); err != nil {
|
if wg, err := s.cfg.Replace(to); err != nil {
|
||||||
l.Warnln("Replacing config:", err)
|
l.Warnln("Replacing config:", err)
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
} else {
|
||||||
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.cfg.Save(); err != nil {
|
if err := s.cfg.Save(); err != nil {
|
||||||
|
@ -39,7 +39,7 @@ func (c *mockedConfig) Options() config.OptionsConfiguration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockedConfig) Replace(cfg config.Configuration) (config.Waiter, error) {
|
func (c *mockedConfig) Replace(cfg config.Configuration) (config.Waiter, error) {
|
||||||
return nil, nil
|
return noopWaiter{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockedConfig) Subscribe(cm config.Committer) {}
|
func (c *mockedConfig) Subscribe(cm config.Committer) {}
|
||||||
@ -53,11 +53,11 @@ func (c *mockedConfig) Devices() map[protocol.DeviceID]config.DeviceConfiguratio
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockedConfig) SetDevice(config.DeviceConfiguration) (config.Waiter, error) {
|
func (c *mockedConfig) SetDevice(config.DeviceConfiguration) (config.Waiter, error) {
|
||||||
return nil, nil
|
return noopWaiter{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockedConfig) SetDevices([]config.DeviceConfiguration) (config.Waiter, error) {
|
func (c *mockedConfig) SetDevices([]config.DeviceConfiguration) (config.Waiter, error) {
|
||||||
return nil, nil
|
return noopWaiter{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockedConfig) Save() error {
|
func (c *mockedConfig) Save() error {
|
||||||
@ -67,3 +67,7 @@ func (c *mockedConfig) Save() error {
|
|||||||
func (c *mockedConfig) RequiresRestart() bool {
|
func (c *mockedConfig) RequiresRestart() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type noopWaiter struct{}
|
||||||
|
|
||||||
|
func (noopWaiter) Wait() {}
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/config"
|
"github.com/syncthing/syncthing/lib/config"
|
||||||
@ -89,6 +90,9 @@ func newFolder(model *Model, cfg config.FolderConfiguration) folder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *folder) Serve() {
|
func (f *folder) Serve() {
|
||||||
|
atomic.AddInt32(&f.model.foldersRunning, 1)
|
||||||
|
defer atomic.AddInt32(&f.model.foldersRunning, -1)
|
||||||
|
|
||||||
l.Debugln(f, "starting")
|
l.Debugln(f, "starting")
|
||||||
defer l.Debugln(f, "exiting")
|
defer l.Debugln(f, "exiting")
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
stdsync "sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/config"
|
"github.com/syncthing/syncthing/lib/config"
|
||||||
@ -94,6 +95,7 @@ type Model struct {
|
|||||||
clientName string
|
clientName string
|
||||||
clientVersion string
|
clientVersion string
|
||||||
|
|
||||||
|
fmut sync.RWMutex // protects the below
|
||||||
folderCfgs map[string]config.FolderConfiguration // folder -> cfg
|
folderCfgs map[string]config.FolderConfiguration // folder -> cfg
|
||||||
folderFiles map[string]*db.FileSet // folder -> files
|
folderFiles map[string]*db.FileSet // folder -> files
|
||||||
deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef
|
deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef
|
||||||
@ -101,14 +103,16 @@ type Model struct {
|
|||||||
folderRunners map[string]service // folder -> puller or scanner
|
folderRunners map[string]service // folder -> puller or scanner
|
||||||
folderRunnerTokens map[string][]suture.ServiceToken // folder -> tokens for puller or scanner
|
folderRunnerTokens map[string][]suture.ServiceToken // folder -> tokens for puller or scanner
|
||||||
folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef
|
folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef
|
||||||
fmut sync.RWMutex // protects the above
|
folderRestartMuts syncMutexMap // folder -> restart mutex
|
||||||
|
|
||||||
|
pmut sync.RWMutex // protects the below
|
||||||
conn map[protocol.DeviceID]connections.Connection
|
conn map[protocol.DeviceID]connections.Connection
|
||||||
closed map[protocol.DeviceID]chan struct{}
|
closed map[protocol.DeviceID]chan struct{}
|
||||||
helloMessages map[protocol.DeviceID]protocol.HelloResult
|
helloMessages map[protocol.DeviceID]protocol.HelloResult
|
||||||
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
|
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
|
||||||
remotePausedFolders map[protocol.DeviceID][]string // deviceID -> folders
|
remotePausedFolders map[protocol.DeviceID][]string // deviceID -> folders
|
||||||
pmut sync.RWMutex // protects the above
|
|
||||||
|
foldersRunning int32 // for testing only
|
||||||
}
|
}
|
||||||
|
|
||||||
type folderFactory func(*Model, config.FolderConfiguration, versioner.Versioner, fs.Filesystem) service
|
type folderFactory func(*Model, config.FolderConfiguration, versioner.Versioner, fs.Filesystem) service
|
||||||
@ -372,11 +376,26 @@ func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration) {
|
|||||||
|
|
||||||
func (m *Model) RestartFolder(from, to config.FolderConfiguration) {
|
func (m *Model) RestartFolder(from, to config.FolderConfiguration) {
|
||||||
if len(to.ID) == 0 {
|
if len(to.ID) == 0 {
|
||||||
panic("cannot add empty folder id")
|
panic("bug: cannot restart empty folder ID")
|
||||||
}
|
}
|
||||||
|
if to.ID != from.ID {
|
||||||
|
panic(fmt.Sprintf("bug: folder restart cannot change ID %q -> %q", from.ID, to.ID))
|
||||||
|
}
|
||||||
|
|
||||||
|
// This mutex protects the entirety of the restart operation, preventing
|
||||||
|
// there from being more than one folder restart operation in progress
|
||||||
|
// at any given time. The usual fmut/pmut stuff doesn't cover this,
|
||||||
|
// because those locks are released while we are waiting for the folder
|
||||||
|
// to shut down (and must be so because the folder might need them as
|
||||||
|
// part of its operations before shutting down).
|
||||||
|
restartMut := m.folderRestartMuts.Get(to.ID)
|
||||||
|
restartMut.Lock()
|
||||||
|
defer restartMut.Unlock()
|
||||||
|
|
||||||
m.fmut.Lock()
|
m.fmut.Lock()
|
||||||
m.pmut.Lock()
|
m.pmut.Lock()
|
||||||
|
defer m.fmut.Unlock()
|
||||||
|
defer m.pmut.Unlock()
|
||||||
|
|
||||||
m.tearDownFolderLocked(from)
|
m.tearDownFolderLocked(from)
|
||||||
if to.Paused {
|
if to.Paused {
|
||||||
@ -386,9 +405,6 @@ func (m *Model) RestartFolder(from, to config.FolderConfiguration) {
|
|||||||
folderType := m.startFolderLocked(to.ID)
|
folderType := m.startFolderLocked(to.ID)
|
||||||
l.Infoln("Restarted folder", to.Description(), fmt.Sprintf("(%s)", folderType))
|
l.Infoln("Restarted folder", to.Description(), fmt.Sprintf("(%s)", folderType))
|
||||||
}
|
}
|
||||||
|
|
||||||
m.pmut.Unlock()
|
|
||||||
m.fmut.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Model) UsageReportingStats(version int, preview bool) map[string]interface{} {
|
func (m *Model) UsageReportingStats(version int, preview bool) map[string]interface{} {
|
||||||
@ -2976,3 +2992,13 @@ func (b *fileInfoBatch) reset() {
|
|||||||
b.infos = b.infos[:0]
|
b.infos = b.infos[:0]
|
||||||
b.size = 0
|
b.size = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// syncMutexMap is a type safe wrapper for a sync.Map that holds mutexes
|
||||||
|
type syncMutexMap struct {
|
||||||
|
inner stdsync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *syncMutexMap) Get(key string) sync.Mutex {
|
||||||
|
v, _ := m.inner.LoadOrStore(key, sync.NewMutex())
|
||||||
|
return v.(sync.Mutex)
|
||||||
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -3850,6 +3851,59 @@ func addFakeConn(m *Model, dev protocol.DeviceID) *fakeConnection {
|
|||||||
return fc
|
return fc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFolderRestartZombies(t *testing.T) {
|
||||||
|
// This is for issue 5233, where multiple concurrent folder restarts
|
||||||
|
// would leave more than one folder runner alive.
|
||||||
|
|
||||||
|
wrapper := createTmpWrapper(defaultCfg.Copy())
|
||||||
|
folderCfg, _ := wrapper.Folder("default")
|
||||||
|
folderCfg.FilesystemType = fs.FilesystemTypeFake
|
||||||
|
wrapper.SetFolder(folderCfg)
|
||||||
|
|
||||||
|
db := db.OpenMemory()
|
||||||
|
m := NewModel(wrapper, protocol.LocalDeviceID, "syncthing", "dev", db, nil)
|
||||||
|
m.AddFolder(folderCfg)
|
||||||
|
m.StartFolder("default")
|
||||||
|
|
||||||
|
m.ServeBackground()
|
||||||
|
defer m.Stop()
|
||||||
|
|
||||||
|
// Make sure the folder is up and running, because we want to count it.
|
||||||
|
m.ScanFolder("default")
|
||||||
|
|
||||||
|
// Check how many running folders we have running before the test.
|
||||||
|
if r := atomic.LoadInt32(&m.foldersRunning); r != 1 {
|
||||||
|
t.Error("Expected one running folder, not", r)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run a few parallel configuration changers for one second. Each waits
|
||||||
|
// for the commit to complete, but there are many of them.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 25; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
t0 := time.Now()
|
||||||
|
for time.Since(t0) < time.Second {
|
||||||
|
cfg := folderCfg.Copy()
|
||||||
|
cfg.MaxConflicts = rand.Int() // safe change that should cause a folder restart
|
||||||
|
w, err := wrapper.SetFolder(cfg)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
w.Wait()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the above to complete and check how many folders we have
|
||||||
|
// running now. It should not have increased.
|
||||||
|
wg.Wait()
|
||||||
|
if r := atomic.LoadInt32(&m.foldersRunning); r != 1 {
|
||||||
|
t.Error("Expected one running folder, not", r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type fakeAddr struct{}
|
type fakeAddr struct{}
|
||||||
|
|
||||||
func (fakeAddr) Network() string {
|
func (fakeAddr) Network() string {
|
||||||
|
Loading…
Reference in New Issue
Block a user