Merge pull request #2676 from calmh/fix2667

More fine grained locking in discovery cache (fixes #2667)
This commit is contained in:
Audrius Butkevicius 2016-01-09 00:58:14 +00:00
commit 0d35fe0f21
2 changed files with 60 additions and 8 deletions

View File

@ -26,7 +26,7 @@ type CachingMux struct {
*suture.Supervisor *suture.Supervisor
finders []cachedFinder finders []cachedFinder
caches []*cache caches []*cache
mut sync.Mutex mut sync.RWMutex
} }
// A cachedFinder is a Finder with associated cache timeouts. // A cachedFinder is a Finder with associated cache timeouts.
@ -54,7 +54,7 @@ type cachedError interface {
func NewCachingMux() *CachingMux { func NewCachingMux() *CachingMux {
return &CachingMux{ return &CachingMux{
Supervisor: suture.NewSimple("discover.cachingMux"), Supervisor: suture.NewSimple("discover.cachingMux"),
mut: sync.NewMutex(), mut: sync.NewRWMutex(),
} }
} }
@ -75,7 +75,7 @@ func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, p
func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) { func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
var pdirect []prioritizedAddress var pdirect []prioritizedAddress
m.mut.Lock() m.mut.RLock()
for i, finder := range m.finders { for i, finder := range m.finders {
if cacheEntry, ok := m.caches[i].Get(deviceID); ok { if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
// We have a cache entry. Lets see what it says. // We have a cache entry. Lets see what it says.
@ -129,7 +129,7 @@ func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays
m.caches[i].Set(deviceID, entry) m.caches[i].Set(deviceID, entry)
} }
} }
m.mut.Unlock() m.mut.RUnlock()
direct = uniqueSortedAddrs(pdirect) direct = uniqueSortedAddrs(pdirect)
relays = uniqueSortedRelays(relays) relays = uniqueSortedRelays(relays)
@ -149,12 +149,12 @@ func (m *CachingMux) Error() error {
} }
func (m *CachingMux) ChildErrors() map[string]error { func (m *CachingMux) ChildErrors() map[string]error {
m.mut.Lock()
children := make(map[string]error, len(m.finders)) children := make(map[string]error, len(m.finders))
m.mut.RLock()
for _, f := range m.finders { for _, f := range m.finders {
children[f.String()] = f.Error() children[f.String()] = f.Error()
} }
m.mut.Unlock() m.mut.RUnlock()
return children return children
} }
@ -163,7 +163,7 @@ func (m *CachingMux) Cache() map[protocol.DeviceID]CacheEntry {
// children's caches. // children's caches.
res := make(map[protocol.DeviceID]CacheEntry) res := make(map[protocol.DeviceID]CacheEntry)
m.mut.Lock() m.mut.RLock()
for i := range m.finders { for i := range m.finders {
// Each finder[i] has a corresponding cache at cache[i]. Go through it // Each finder[i] has a corresponding cache at cache[i]. Go through it
// and populate the total, if it's newer than what's already in there. // and populate the total, if it's newer than what's already in there.
@ -183,7 +183,7 @@ func (m *CachingMux) Cache() map[protocol.DeviceID]CacheEntry {
} }
} }
} }
m.mut.Unlock() m.mut.RUnlock()
return res return res
} }

View File

@ -91,3 +91,55 @@ func (f *fakeDiscovery) String() string {
func (f *fakeDiscovery) Cache() map[protocol.DeviceID]CacheEntry { func (f *fakeDiscovery) Cache() map[protocol.DeviceID]CacheEntry {
return nil return nil
} }
func TestCacheSlowLookup(t *testing.T) {
c := NewCachingMux()
c.ServeBackground()
defer c.Stop()
// Add a slow discovery service.
started := make(chan struct{})
f1 := &slowDiscovery{time.Second, started}
c.Add(f1, time.Minute, 0, 0)
// Start a lookup, which will take at least a second
t0 := time.Now()
go c.Lookup(protocol.LocalDeviceID)
<-started // The slow lookup method has been called so we're inside the lock
// It should be possible to get ChildErrors while it's running
c.ChildErrors()
// Only a small amount of time should have passed, not the full second
diff := time.Since(t0)
if diff > 500*time.Millisecond {
t.Error("ChildErrors was blocked for", diff)
}
}
type slowDiscovery struct {
delay time.Duration
started chan struct{}
}
func (f *slowDiscovery) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
close(f.started)
time.Sleep(f.delay)
return nil, nil, nil
}
func (f *slowDiscovery) Error() error {
return nil
}
func (f *slowDiscovery) String() string {
return "fake"
}
func (f *slowDiscovery) Cache() map[protocol.DeviceID]CacheEntry {
return nil
}