lib/connections: More fine grained locking (fixes #3066)

This fixes the deadlock by reducing where we hold the various locks. To
start with it splits up the existing "mut" into a "listenersMut" and a
"curConMut" as these are the two things being protected and I can see no
relation between them that requires a shared lock. It also moves all
model calls outside of the lock, as I see no reason to hold the lock
while calling the model (and it's risky, as proven).

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3069
This commit is contained in:
Jakob Borg 2016-05-09 15:03:12 +00:00 committed by Audrius Butkevicius
parent 1a703efa78
commit 31f64186ae

View File

@ -54,9 +54,11 @@ type Service struct {
natService *nat.Service natService *nat.Service
natServiceToken *suture.ServiceToken natServiceToken *suture.ServiceToken
mut sync.RWMutex listenersMut sync.RWMutex
listeners map[string]genericListener listeners map[string]genericListener
listenerTokens map[string]suture.ServiceToken listenerTokens map[string]suture.ServiceToken
curConMut sync.Mutex
currentConnection map[protocol.DeviceID]Connection currentConnection map[protocol.DeviceID]Connection
} }
@ -76,9 +78,11 @@ func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *
lans: lans, lans: lans,
natService: nat.NewService(myID, cfg), natService: nat.NewService(myID, cfg),
mut: sync.NewRWMutex(), listenersMut: sync.NewRWMutex(),
listeners: make(map[string]genericListener), listeners: make(map[string]genericListener),
listenerTokens: make(map[string]suture.ServiceToken), listenerTokens: make(map[string]suture.ServiceToken),
curConMut: sync.NewMutex(),
currentConnection: make(map[protocol.DeviceID]Connection), currentConnection: make(map[protocol.DeviceID]Connection),
} }
cfg.Subscribe(service) cfg.Subscribe(service)
@ -153,9 +157,9 @@ next:
// If we have a relay connection, and the new incoming connection is // If we have a relay connection, and the new incoming connection is
// not a relay connection, we should drop that, and prefer the this one. // not a relay connection, we should drop that, and prefer the this one.
s.mut.RLock() s.curConMut.Lock()
skip := false
ct, ok := s.currentConnection[remoteID] ct, ok := s.currentConnection[remoteID]
s.curConMut.Unlock()
// Lower priority is better, just like nice etc. // Lower priority is better, just like nice etc.
if ok && ct.Priority > c.Priority { if ok && ct.Priority > c.Priority {
@ -170,14 +174,10 @@ next:
// connections still established... // connections still established...
l.Infof("Connected to already connected device (%s)", remoteID) l.Infof("Connected to already connected device (%s)", remoteID)
c.Close() c.Close()
skip = true continue
} else if s.model.IsPaused(remoteID) { } else if s.model.IsPaused(remoteID) {
l.Infof("Connection from paused device (%s)", remoteID) l.Infof("Connection from paused device (%s)", remoteID)
c.Close() c.Close()
skip = true
}
s.mut.RUnlock()
if skip {
continue continue
} }
@ -222,10 +222,10 @@ next:
l.Infof("Established secure connection to %s at %s", remoteID, name) l.Infof("Established secure connection to %s at %s", remoteID, name)
l.Debugf("cipher suite: %04X in lan: %t", c.ConnectionState().CipherSuite, !limit) l.Debugf("cipher suite: %04X in lan: %t", c.ConnectionState().CipherSuite, !limit)
s.mut.Lock()
s.model.AddConnection(modelConn, hello) s.model.AddConnection(modelConn, hello)
s.curConMut.Lock()
s.currentConnection[remoteID] = modelConn s.currentConnection[remoteID] = modelConn
s.mut.Unlock() s.curConMut.Unlock()
continue next continue next
} }
} }
@ -239,6 +239,7 @@ func (s *Service) connect() {
nextDial := make(map[string]time.Time) nextDial := make(map[string]time.Time)
delay := time.Second delay := time.Second
sleep := time.Second sleep := time.Second
for { for {
l.Debugln("Reconnect loop") l.Debugln("Reconnect loop")
@ -251,18 +252,18 @@ func (s *Service) connect() {
continue continue
} }
l.Debugln("Reconnect loop for", deviceID)
s.mut.RLock()
paused := s.model.IsPaused(deviceID) paused := s.model.IsPaused(deviceID)
connected := s.model.ConnectedTo(deviceID)
ct := s.currentConnection[deviceID]
s.mut.RUnlock()
if paused { if paused {
continue continue
} }
l.Debugln("Reconnect loop for", deviceID)
connected := s.model.ConnectedTo(deviceID)
s.curConMut.Lock()
ct := s.currentConnection[deviceID]
s.curConMut.Unlock()
var addrs []string var addrs []string
for _, addr := range deviceCfg.Addresses { for _, addr := range deviceCfg.Addresses {
if addr == "dynamic" { if addr == "dynamic" {
@ -354,6 +355,7 @@ func (s *Service) shouldLimit(addr net.Addr) bool {
} }
func (s *Service) createListener(addr string) { func (s *Service) createListener(addr string) {
// must be called with listenerMut held
uri, err := url.Parse(addr) uri, err := url.Parse(addr)
if err != nil { if err != nil {
l.Infoln("Failed to parse listen address:", addr, err) l.Infoln("Failed to parse listen address:", addr, err)
@ -368,10 +370,8 @@ func (s *Service) createListener(addr string) {
listener := listenerFactory(uri, s.tlsCfg, s.conns, s.natService) listener := listenerFactory(uri, s.tlsCfg, s.conns, s.natService)
listener.OnAddressesChanged(s.logListenAddressesChangedEvent) listener.OnAddressesChanged(s.logListenAddressesChangedEvent)
s.mut.Lock()
s.listeners[addr] = listener s.listeners[addr] = listener
s.listenerTokens[addr] = s.Add(listener) s.listenerTokens[addr] = s.Add(listener)
s.mut.Unlock()
} }
func (s *Service) logListenAddressesChangedEvent(l genericListener) { func (s *Service) logListenAddressesChangedEvent(l genericListener) {
@ -402,21 +402,16 @@ func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
} }
} }
s.mut.RLock() s.listenersMut.Lock()
existingListeners := s.listeners
s.mut.RUnlock()
seen := make(map[string]struct{}) seen := make(map[string]struct{})
for _, addr := range config.Wrap("", to).ListenAddresses() { for _, addr := range config.Wrap("", to).ListenAddresses() {
if _, ok := existingListeners[addr]; !ok { if _, ok := s.listeners[addr]; !ok {
l.Debugln("Staring listener", addr) l.Debugln("Staring listener", addr)
s.createListener(addr) s.createListener(addr)
} }
seen[addr] = struct{}{} seen[addr] = struct{}{}
} }
s.mut.Lock()
for addr := range s.listeners { for addr := range s.listeners {
if _, ok := seen[addr]; !ok { if _, ok := seen[addr]; !ok {
l.Debugln("Stopping listener", addr) l.Debugln("Stopping listener", addr)
@ -425,7 +420,7 @@ func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
delete(s.listeners, addr) delete(s.listeners, addr)
} }
} }
s.mut.Unlock() s.listenersMut.Unlock()
if to.Options.NATEnabled && s.natServiceToken == nil { if to.Options.NATEnabled && s.natServiceToken == nil {
l.Debugln("Starting NAT service") l.Debugln("Starting NAT service")
@ -441,7 +436,7 @@ func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
} }
func (s *Service) AllAddresses() []string { func (s *Service) AllAddresses() []string {
s.mut.RLock() s.listenersMut.RLock()
var addrs []string var addrs []string
for _, listener := range s.listeners { for _, listener := range s.listeners {
for _, lanAddr := range listener.LANAddresses() { for _, lanAddr := range listener.LANAddresses() {
@ -451,24 +446,24 @@ func (s *Service) AllAddresses() []string {
addrs = append(addrs, wanAddr.String()) addrs = append(addrs, wanAddr.String())
} }
} }
s.mut.RUnlock() s.listenersMut.RUnlock()
return util.UniqueStrings(addrs) return util.UniqueStrings(addrs)
} }
func (s *Service) ExternalAddresses() []string { func (s *Service) ExternalAddresses() []string {
s.mut.RLock() s.listenersMut.RLock()
var addrs []string var addrs []string
for _, listener := range s.listeners { for _, listener := range s.listeners {
for _, wanAddr := range listener.WANAddresses() { for _, wanAddr := range listener.WANAddresses() {
addrs = append(addrs, wanAddr.String()) addrs = append(addrs, wanAddr.String())
} }
} }
s.mut.RUnlock() s.listenersMut.RUnlock()
return util.UniqueStrings(addrs) return util.UniqueStrings(addrs)
} }
func (s *Service) Status() map[string]interface{} { func (s *Service) Status() map[string]interface{} {
s.mut.RLock() s.listenersMut.RLock()
result := make(map[string]interface{}) result := make(map[string]interface{})
for addr, listener := range s.listeners { for addr, listener := range s.listeners {
status := make(map[string]interface{}) status := make(map[string]interface{})
@ -483,7 +478,7 @@ func (s *Service) Status() map[string]interface{} {
result[addr] = status result[addr] = status
} }
s.mut.RUnlock() s.listenersMut.RUnlock()
return result return result
} }