lib/model, lib/protocol: Track closing connections (fixes #5828) (#5829)

This commit is contained in:
Simon Frei 2019-07-14 11:03:55 +02:00 committed by Jakob Borg
parent def4b8cee5
commit 82b70b9fae
2 changed files with 34 additions and 20 deletions

View File

@ -222,16 +222,8 @@ func (m *model) Stop() {
for id := range devs { for id := range devs {
ids = append(ids, id) ids = append(ids, id)
} }
m.pmut.RLock() w := m.closeConns(ids, errStopped)
closed := make([]chan struct{}, 0, len(m.closed)) w.Wait()
for _, c := range m.closed {
closed = append(closed, c)
}
m.pmut.RUnlock()
m.closeConns(ids, errStopped)
for _, c := range closed {
<-c
}
} }
// StartDeadlockDetector starts a deadlock detector on the models locks which // StartDeadlockDetector starts a deadlock detector on the models locks which
@ -416,12 +408,16 @@ func (m *model) tearDownFolderLocked(cfg config.FolderConfiguration, err error)
// Close connections to affected devices // Close connections to affected devices
// Must happen before stopping the folder service to abort ongoing // Must happen before stopping the folder service to abort ongoing
// transmissions and thus allow timely service termination. // transmissions and thus allow timely service termination.
m.closeConns(cfg.DeviceIDs(), err) w := m.closeConns(cfg.DeviceIDs(), err)
for _, id := range tokens { for _, id := range tokens {
m.RemoveAndWait(id, 0) m.RemoveAndWait(id, 0)
} }
// Wait for connections to stop to ensure that no more calls to methods
// expecting this folder to exist happen (e.g. .IndexUpdate).
w.Wait()
m.fmut.Lock() m.fmut.Lock()
// Clean up our config maps // Clean up our config maps
@ -1440,23 +1436,39 @@ func (m *model) Closed(conn protocol.Connection, err error) {
close(closed) close(closed)
} }
// closeConns will close the underlying connection for given devices // closeConns will close the underlying connection for given devices and return
func (m *model) closeConns(devs []protocol.DeviceID, err error) { // a waiter that will return once all the connections are finished closing.
func (m *model) closeConns(devs []protocol.DeviceID, err error) config.Waiter {
conns := make([]connections.Connection, 0, len(devs)) conns := make([]connections.Connection, 0, len(devs))
closed := make([]chan struct{}, 0, len(devs))
m.pmut.Lock() m.pmut.Lock()
for _, dev := range devs { for _, dev := range devs {
if conn, ok := m.conn[dev]; ok { if conn, ok := m.conn[dev]; ok {
conns = append(conns, conn) conns = append(conns, conn)
closed = append(closed, m.closed[dev])
} }
} }
m.pmut.Unlock() m.pmut.Unlock()
for _, conn := range conns { for _, conn := range conns {
conn.Close(err) conn.Close(err)
} }
return &channelWaiter{chans: closed}
} }
func (m *model) closeConn(dev protocol.DeviceID, err error) { // closeConn closes the underlying connection for the given device and returns
m.closeConns([]protocol.DeviceID{dev}, err) // a waiter that will return once the connection is finished closing.
func (m *model) closeConn(dev protocol.DeviceID, err error) config.Waiter {
return m.closeConns([]protocol.DeviceID{dev}, err)
}
type channelWaiter struct {
chans []chan struct{}
}
func (w *channelWaiter) Wait() {
for _, c := range w.chans {
<-c
}
} }
// Implements protocol.RequestResponse // Implements protocol.RequestResponse
@ -2537,7 +2549,6 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
if toCfg.Paused { if toCfg.Paused {
l.Infoln("Pausing", deviceID) l.Infoln("Pausing", deviceID)
m.closeConn(deviceID, errDevicePaused)
events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()}) events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()})
} else { } else {
events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()}) events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()})

View File

@ -187,6 +187,7 @@ type rawConnection struct {
closeBox chan asyncMessage closeBox chan asyncMessage
clusterConfigBox chan *ClusterConfig clusterConfigBox chan *ClusterConfig
dispatcherLoopStopped chan struct{} dispatcherLoopStopped chan struct{}
preventSends chan struct{}
closed chan struct{} closed chan struct{}
closeOnce sync.Once closeOnce sync.Once
sendCloseOnce sync.Once sendCloseOnce sync.Once
@ -240,6 +241,7 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
closeBox: make(chan asyncMessage), closeBox: make(chan asyncMessage),
clusterConfigBox: make(chan *ClusterConfig), clusterConfigBox: make(chan *ClusterConfig),
dispatcherLoopStopped: make(chan struct{}), dispatcherLoopStopped: make(chan struct{}),
preventSends: make(chan struct{}),
closed: make(chan struct{}), closed: make(chan struct{}),
compression: compress, compression: compress,
} }
@ -662,13 +664,14 @@ func (c *rawConnection) send(msg message, done chan struct{}) bool {
select { select {
case c.outbox <- asyncMessage{msg, done}: case c.outbox <- asyncMessage{msg, done}:
return true return true
case <-c.preventSends:
case <-c.closed: case <-c.closed:
}
if done != nil { if done != nil {
close(done) close(done)
} }
return false return false
} }
}
func (c *rawConnection) writerLoop() { func (c *rawConnection) writerLoop() {
select { select {