From 5da41f75fab1b8cf07907789c738c59c960ace4b Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Sun, 28 Apr 2019 12:58:51 +0200 Subject: [PATCH] lib/model, lib/protocol: Wait for reader/writer loops on close (fixes #4170) (#5657) * lib/protocol: Wait for reader/writer loops on close (fixes #4170) * waitgroup * lib/model: Don't hold lock while closing connection * fix comments * review (lock once, func argument) and naming --- lib/model/model.go | 68 +++++++++++++++++++--------------------- lib/protocol/protocol.go | 26 ++++++++++----- 2 files changed, 50 insertions(+), 44 deletions(-) diff --git a/lib/model/model.go b/lib/model/model.go index 54d20d516..ad9eabb64 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -238,15 +238,14 @@ func (m *model) StartDeadlockDetector(timeout time.Duration) { // StartFolder constructs the folder service and starts it. func (m *model) StartFolder(folder string) { m.fmut.Lock() - m.pmut.Lock() + defer m.fmut.Unlock() folderCfg := m.folderCfgs[folder] m.startFolderLocked(folderCfg) - m.pmut.Unlock() - m.fmut.Unlock() l.Infof("Ready to synchronize %s (%s)", folderCfg.Description(), folderCfg.Type) } +// Need to hold lock on m.fmut when calling this. func (m *model) startFolderLocked(cfg config.FolderConfiguration) { if err := m.checkFolderRunningLocked(cfg.ID); err == errFolderMissing { panic("cannot start nonexistent folder " + cfg.Description()) @@ -274,9 +273,9 @@ func (m *model) startFolderLocked(cfg config.FolderConfiguration) { } // Close connections to affected devices - for _, id := range cfg.DeviceIDs() { - m.closeLocked(id, fmt.Errorf("started folder %v", cfg.Description())) - } + m.fmut.Unlock() + m.closeConns(cfg.DeviceIDs(), fmt.Errorf("started folder %v", cfg.Description())) + m.fmut.Lock() v, ok := fset.Sequence(protocol.LocalDeviceID), true indexHasFiles := ok && v > 0 @@ -382,9 +381,7 @@ func (m *model) addFolderLocked(cfg config.FolderConfiguration) { func (m *model) RemoveFolder(cfg config.FolderConfiguration) { m.fmut.Lock() - m.pmut.Lock() defer m.fmut.Unlock() - defer m.pmut.Unlock() // Delete syncthing specific files cfg.Filesystem().RemoveAll(config.DefaultMarkerName) @@ -394,24 +391,24 @@ func (m *model) RemoveFolder(cfg config.FolderConfiguration) { db.DropFolder(m.db, cfg.ID) } +// Need to hold lock on m.fmut when calling this. func (m *model) tearDownFolderLocked(cfg config.FolderConfiguration, err error) { - // Close connections to affected devices - // Must happen before stopping the folder service to abort ongoing - // transmissions and thus allow timely service termination. - for _, dev := range cfg.Devices { - m.closeLocked(dev.DeviceID, err) - } - // Stop the services running for this folder and wait for them to finish // stopping to prevent races on restart. tokens := m.folderRunnerTokens[cfg.ID] - m.pmut.Unlock() + m.fmut.Unlock() + + // Close connections to affected devices + // Must happen before stopping the folder service to abort ongoing + // transmissions and thus allow timely service termination. + m.closeConns(cfg.DeviceIDs(), err) + for _, id := range tokens { m.RemoveAndWait(id, 0) } + m.fmut.Lock() - m.pmut.Lock() // Clean up our config maps delete(m.folderCfgs, cfg.ID) @@ -439,11 +436,6 @@ func (m *model) RestartFolder(from, to config.FolderConfiguration) { restartMut.Lock() defer restartMut.Unlock() - m.fmut.Lock() - m.pmut.Lock() - defer m.fmut.Unlock() - defer m.pmut.Unlock() - var infoMsg string var errMsg string switch { @@ -458,6 +450,9 @@ func (m *model) RestartFolder(from, to config.FolderConfiguration) { errMsg = "restarting" } + m.fmut.Lock() + defer m.fmut.Unlock() + m.tearDownFolderLocked(from, fmt.Errorf("%v folder %v", errMsg, to.Description())) if !to.Paused { m.addFolderLocked(to) @@ -1409,22 +1404,23 @@ func (m *model) Closed(conn protocol.Connection, err error) { close(closed) } -// close will close the underlying connection for a given device -func (m *model) close(device protocol.DeviceID, err error) { +// closeConns will close the underlying connection for given devices +func (m *model) closeConns(devs []protocol.DeviceID, err error) { + conns := make([]connections.Connection, 0, len(devs)) m.pmut.Lock() - m.closeLocked(device, err) + for _, dev := range devs { + if conn, ok := m.conn[dev]; ok { + conns = append(conns, conn) + } + } m.pmut.Unlock() + for _, conn := range conns { + conn.Close(err) + } } -// closeLocked will close the underlying connection for a given device -func (m *model) closeLocked(device protocol.DeviceID, err error) { - conn, ok := m.conn[device] - if !ok { - // There is no connection to close - return - } - - conn.Close(err) +func (m *model) closeConn(dev protocol.DeviceID, err error) { + m.closeConns([]protocol.DeviceID{dev}, err) } // Implements protocol.RequestResponse @@ -2569,12 +2565,12 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { // Ignored folder was removed, reconnect to retrigger the prompt. if len(fromCfg.IgnoredFolders) > len(toCfg.IgnoredFolders) { - m.close(deviceID, errIgnoredFolderRemoved) + m.closeConn(deviceID, errIgnoredFolderRemoved) } if toCfg.Paused { l.Infoln("Pausing", deviceID) - m.close(deviceID, errDevicePaused) + m.closeConn(deviceID, errDevicePaused) events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()}) } else { events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()}) diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index e283db47e..ca9292f3f 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -187,6 +187,7 @@ type rawConnection struct { closed chan struct{} closeOnce sync.Once sendCloseOnce sync.Once + wg sync.WaitGroup compression Compression } @@ -239,6 +240,7 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv // Start creates the goroutines for sending and receiving of messages. It must // be called exactly once after creating a connection. func (c *rawConnection) Start() { + c.wg.Add(4) go func() { err := c.readerLoop() c.internalClose(err) @@ -362,13 +364,12 @@ func (c *rawConnection) ping() bool { } func (c *rawConnection) readerLoop() (err error) { + defer c.wg.Done() fourByteBuf := make([]byte, 4) state := stateInitial for { - select { - case <-c.closed: + if c.Closed() { return ErrClosed - default: } msg, err := c.readMessage(fourByteBuf) @@ -660,6 +661,7 @@ func (c *rawConnection) send(msg message, done chan struct{}) bool { } func (c *rawConnection) writerLoop() { + defer c.wg.Done() for { select { case hm := <-c.outbox: @@ -846,10 +848,7 @@ func (c *rawConnection) Close(err error) { } }) - // No more sends are necessary, therefore further steps to close the - // connection outside of this package can proceed immediately. - // And this prevents a potential deadlock due to calling c.receiver.Closed - go c.internalClose(err) + c.internalClose(err) } // internalClose is called if there is an unexpected error during normal operation. @@ -867,7 +866,14 @@ func (c *rawConnection) internalClose(err error) { } c.awaitingMut.Unlock() - c.receiver.Closed(c, err) + // Wait for all our operations to terminate before signaling + // to the receiver that the connection was closed. + c.wg.Wait() + + // No more sends are necessary, therefore further steps to close the + // connection outside of this package can proceed immediately. + // And this prevents a potential deadlock. + go c.receiver.Closed(c, err) }) } @@ -877,6 +883,8 @@ func (c *rawConnection) internalClose(err error) { // results in an effecting ping interval of somewhere between // PingSendInterval/2 and PingSendInterval. func (c *rawConnection) pingSender() { + defer c.wg.Done() + ticker := time.NewTicker(PingSendInterval / 2) defer ticker.Stop() @@ -902,6 +910,8 @@ func (c *rawConnection) pingSender() { // but we expect pings in the absence of other messages) within the last // ReceiveTimeout. If not, we close the connection with an ErrTimeout. func (c *rawConnection) pingReceiver() { + defer c.wg.Done() + ticker := time.NewTicker(ReceiveTimeout / 2) defer ticker.Stop()