mirror of
https://github.com/octoleo/syncthing.git
synced 2025-02-02 11:58:28 +00:00
More fine grained locking in discovery cache (fixes #2667)
We only need to protect the integrity of the "finders" and "caches" slices, and for that we only need an RLock except while actually appending to them. The actual finders and caches are concurrency safe on their own.
This commit is contained in:
parent
25b3c09f6a
commit
370b0fc5da
@ -26,7 +26,7 @@ type CachingMux struct {
|
|||||||
*suture.Supervisor
|
*suture.Supervisor
|
||||||
finders []cachedFinder
|
finders []cachedFinder
|
||||||
caches []*cache
|
caches []*cache
|
||||||
mut sync.Mutex
|
mut sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// A cachedFinder is a Finder with associated cache timeouts.
|
// A cachedFinder is a Finder with associated cache timeouts.
|
||||||
@ -54,7 +54,7 @@ type cachedError interface {
|
|||||||
func NewCachingMux() *CachingMux {
|
func NewCachingMux() *CachingMux {
|
||||||
return &CachingMux{
|
return &CachingMux{
|
||||||
Supervisor: suture.NewSimple("discover.cachingMux"),
|
Supervisor: suture.NewSimple("discover.cachingMux"),
|
||||||
mut: sync.NewMutex(),
|
mut: sync.NewRWMutex(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,7 +75,7 @@ func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, p
|
|||||||
func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
|
func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
|
||||||
var pdirect []prioritizedAddress
|
var pdirect []prioritizedAddress
|
||||||
|
|
||||||
m.mut.Lock()
|
m.mut.RLock()
|
||||||
for i, finder := range m.finders {
|
for i, finder := range m.finders {
|
||||||
if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
|
if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
|
||||||
// We have a cache entry. Lets see what it says.
|
// We have a cache entry. Lets see what it says.
|
||||||
@ -129,7 +129,7 @@ func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays
|
|||||||
m.caches[i].Set(deviceID, entry)
|
m.caches[i].Set(deviceID, entry)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.mut.Unlock()
|
m.mut.RUnlock()
|
||||||
|
|
||||||
direct = uniqueSortedAddrs(pdirect)
|
direct = uniqueSortedAddrs(pdirect)
|
||||||
relays = uniqueSortedRelays(relays)
|
relays = uniqueSortedRelays(relays)
|
||||||
@ -149,12 +149,12 @@ func (m *CachingMux) Error() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *CachingMux) ChildErrors() map[string]error {
|
func (m *CachingMux) ChildErrors() map[string]error {
|
||||||
m.mut.Lock()
|
|
||||||
children := make(map[string]error, len(m.finders))
|
children := make(map[string]error, len(m.finders))
|
||||||
|
m.mut.RLock()
|
||||||
for _, f := range m.finders {
|
for _, f := range m.finders {
|
||||||
children[f.String()] = f.Error()
|
children[f.String()] = f.Error()
|
||||||
}
|
}
|
||||||
m.mut.Unlock()
|
m.mut.RUnlock()
|
||||||
return children
|
return children
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -163,7 +163,7 @@ func (m *CachingMux) Cache() map[protocol.DeviceID]CacheEntry {
|
|||||||
// children's caches.
|
// children's caches.
|
||||||
res := make(map[protocol.DeviceID]CacheEntry)
|
res := make(map[protocol.DeviceID]CacheEntry)
|
||||||
|
|
||||||
m.mut.Lock()
|
m.mut.RLock()
|
||||||
for i := range m.finders {
|
for i := range m.finders {
|
||||||
// Each finder[i] has a corresponding cache at cache[i]. Go through it
|
// Each finder[i] has a corresponding cache at cache[i]. Go through it
|
||||||
// and populate the total, if it's newer than what's already in there.
|
// and populate the total, if it's newer than what's already in there.
|
||||||
@ -183,7 +183,7 @@ func (m *CachingMux) Cache() map[protocol.DeviceID]CacheEntry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.mut.Unlock()
|
m.mut.RUnlock()
|
||||||
|
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
@ -91,3 +91,55 @@ func (f *fakeDiscovery) String() string {
|
|||||||
func (f *fakeDiscovery) Cache() map[protocol.DeviceID]CacheEntry {
|
func (f *fakeDiscovery) Cache() map[protocol.DeviceID]CacheEntry {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacheSlowLookup(t *testing.T) {
|
||||||
|
c := NewCachingMux()
|
||||||
|
c.ServeBackground()
|
||||||
|
defer c.Stop()
|
||||||
|
|
||||||
|
// Add a slow discovery service.
|
||||||
|
|
||||||
|
started := make(chan struct{})
|
||||||
|
f1 := &slowDiscovery{time.Second, started}
|
||||||
|
c.Add(f1, time.Minute, 0, 0)
|
||||||
|
|
||||||
|
// Start a lookup, which will take at least a second
|
||||||
|
|
||||||
|
t0 := time.Now()
|
||||||
|
go c.Lookup(protocol.LocalDeviceID)
|
||||||
|
<-started // The slow lookup method has been called so we're inside the lock
|
||||||
|
|
||||||
|
// It should be possible to get ChildErrors while it's running
|
||||||
|
|
||||||
|
c.ChildErrors()
|
||||||
|
|
||||||
|
// Only a small amount of time should have passed, not the full second
|
||||||
|
|
||||||
|
diff := time.Since(t0)
|
||||||
|
if diff > 500*time.Millisecond {
|
||||||
|
t.Error("ChildErrors was blocked for", diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type slowDiscovery struct {
|
||||||
|
delay time.Duration
|
||||||
|
started chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *slowDiscovery) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
|
||||||
|
close(f.started)
|
||||||
|
time.Sleep(f.delay)
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *slowDiscovery) Error() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *slowDiscovery) String() string {
|
||||||
|
return "fake"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *slowDiscovery) Cache() map[protocol.DeviceID]CacheEntry {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user