diff --git a/lib/connections/structs.go b/lib/connections/structs.go index 4e243cc34..b6ed39829 100644 --- a/lib/connections/structs.go +++ b/lib/connections/structs.go @@ -9,7 +9,6 @@ package connections import ( "crypto/tls" "fmt" - "io" "net" "net/url" "time" @@ -23,7 +22,6 @@ import ( // that can be closed and has some metadata. type Connection interface { protocol.Connection - io.Closer Type() string Transport() string RemoteAddr() net.Addr @@ -39,6 +37,11 @@ type completeConn struct { protocol.Connection } +func (c completeConn) Close(err error) { + c.Connection.Close(err) + c.internalConn.Close() +} + // internalConn is the raw TLS connection plus some metadata on where it // came from (type, priority). type internalConn struct { @@ -82,6 +85,14 @@ func (t connType) Transport() string { } } +func (c internalConn) Close() { + // *tls.Conn.Close() does more than it says on the tin. Specifically, it + // sends a TLS alert message, which might block forever if the + // connection is dead and we don't have a deadline set. + c.SetWriteDeadline(time.Now().Add(250 * time.Millisecond)) + c.Conn.Close() +} + func (c internalConn) Type() string { return c.connType.String() } diff --git a/lib/model/model.go b/lib/model/model.go index a96775fb0..16ba5ba88 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -8,11 +8,9 @@ package model import ( "bytes" - "crypto/tls" "encoding/json" "errors" "fmt" - "io" "net" "path/filepath" "reflect" @@ -128,6 +126,9 @@ var ( errFolderNotRunning = errors.New("folder is not running") errFolderMissing = errors.New("no such folder") errNetworkNotAllowed = errors.New("network not allowed") + // errors about why a connection is closed + errIgnoredFolderRemoved = errors.New("folder no longer ignored") + errReplacingConnection = errors.New("replacing connection") ) // NewModel creates and starts a new model. The model starts in read-only mode, @@ -226,7 +227,7 @@ func (m *Model) startFolderLocked(folder string) config.FolderType { // Close connections to affected devices for _, id := range cfg.DeviceIDs() { - m.closeLocked(id) + m.closeLocked(id, fmt.Errorf("started folder %v", cfg.Description())) } v, ok := fs.Sequence(protocol.LocalDeviceID), true @@ -339,7 +340,7 @@ func (m *Model) RemoveFolder(cfg config.FolderConfiguration) { // Delete syncthing specific files cfg.Filesystem().RemoveAll(config.DefaultMarkerName) - m.tearDownFolderLocked(cfg) + m.tearDownFolderLocked(cfg, fmt.Errorf("removing folder %v", cfg.Description())) // Remove it from the database db.DropFolder(m.db, cfg.ID) @@ -347,12 +348,12 @@ func (m *Model) RemoveFolder(cfg config.FolderConfiguration) { m.fmut.Unlock() } -func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration) { +func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration, err error) { // Close connections to affected devices // Must happen before stopping the folder service to abort ongoing // transmissions and thus allow timely service termination. for _, dev := range cfg.Devices { - m.closeLocked(dev.DeviceID) + m.closeLocked(dev.DeviceID, err) } // Stop the services running for this folder and wait for them to finish @@ -398,14 +399,26 @@ func (m *Model) RestartFolder(from, to config.FolderConfiguration) { defer m.fmut.Unlock() defer m.pmut.Unlock() - m.tearDownFolderLocked(from) - if to.Paused { - l.Infoln("Paused folder", to.Description()) - } else { - m.addFolderLocked(to) - folderType := m.startFolderLocked(to.ID) - l.Infoln("Restarted folder", to.Description(), fmt.Sprintf("(%s)", folderType)) + var infoMsg string + var errMsg string + switch { + case to.Paused: + infoMsg = "Paused" + errMsg = "pausing" + case from.Paused: + infoMsg = "Unpaused" + errMsg = "unpausing" + default: + infoMsg = "Restarted" + errMsg = "restarting" } + + m.tearDownFolderLocked(from, fmt.Errorf("%v folder %v", errMsg, to.Description())) + if !to.Paused { + m.addFolderLocked(to) + m.startFolderLocked(to.ID) + } + l.Infof("%v folder %v (%v)", infoMsg, to.Description(), to.Type) } func (m *Model) UsageReportingStats(version int, preview bool) map[string]interface{} { @@ -1342,21 +1355,21 @@ func (m *Model) Closed(conn protocol.Connection, err error) { } // close will close the underlying connection for a given device -func (m *Model) close(device protocol.DeviceID) { +func (m *Model) close(device protocol.DeviceID, err error) { m.pmut.Lock() - m.closeLocked(device) + m.closeLocked(device, err) m.pmut.Unlock() } // closeLocked will close the underlying connection for a given device -func (m *Model) closeLocked(device protocol.DeviceID) { +func (m *Model) closeLocked(device protocol.DeviceID, err error) { conn, ok := m.conn[device] if !ok { // There is no connection to close return } - closeRawConn(conn) + conn.Close(err) } // Implements protocol.RequestResponse @@ -1719,7 +1732,7 @@ func (m *Model) AddConnection(conn connections.Connection, hello protocol.HelloR // back into Closed() for the cleanup. closed := m.closed[deviceID] m.pmut.Unlock() - closeRawConn(oldConn) + oldConn.Close(errReplacingConnection) <-closed m.pmut.Lock() } @@ -2582,6 +2595,10 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool { continue } + if fromCfg.Paused && toCfg.Paused { + continue + } + // This folder exists on both sides. Settings might have changed. // Check if anything differs that requires a restart. if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) { @@ -2616,12 +2633,12 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool { // Ignored folder was removed, reconnect to retrigger the prompt. if len(fromCfg.IgnoredFolders) > len(toCfg.IgnoredFolders) { - m.close(deviceID) + m.close(deviceID, errIgnoredFolderRemoved) } if toCfg.Paused { l.Infoln("Pausing", deviceID) - m.close(deviceID) + m.close(deviceID, errDevicePaused) events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()}) } else { events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()}) @@ -2717,17 +2734,6 @@ func getChunk(data []string, skip, get int) ([]string, int, int) { return data[skip : skip+get], 0, 0 } -func closeRawConn(conn io.Closer) error { - if conn, ok := conn.(*tls.Conn); ok { - // If the underlying connection is a *tls.Conn, Close() does more - // than it says on the tin. Specifically, it sends a TLS alert - // message, which might block forever if the connection is dead - // and we don't have a deadline set. - conn.SetWriteDeadline(time.Now().Add(250 * time.Millisecond)) - } - return conn.Close() -} - func stringSliceWithout(ss []string, s string) []string { for i := range ss { if ss[i] == s { diff --git a/lib/model/model_test.go b/lib/model/model_test.go index e8f286c8c..6b909db6f 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -316,11 +316,10 @@ type fakeConnection struct { mut sync.Mutex } -func (f *fakeConnection) Close() error { +func (f *fakeConnection) Close(_ error) { f.mut.Lock() defer f.mut.Unlock() f.closed = true - return nil } func (f *fakeConnection) Start() { diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index 384dab499..e6f6e8983 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -143,6 +143,7 @@ type RequestResponse interface { type Connection interface { Start() + Close(err error) ID() DeviceID Name() string Index(folder string, files []FileInfo) error @@ -171,6 +172,7 @@ type rawConnection struct { nextIDMut sync.Mutex outbox chan asyncMessage + sendClose chan asyncMessage closed chan struct{} once sync.Once compression Compression @@ -214,6 +216,7 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv cw: cw, awaiting: make(map[int32]chan asyncResult), outbox: make(chan asyncMessage), + sendClose: make(chan asyncMessage), closed: make(chan struct{}), compression: compress, } @@ -334,7 +337,7 @@ func (c *rawConnection) ping() bool { func (c *rawConnection) readerLoop() (err error) { defer func() { - c.close(err) + c.internalClose(err) }() fourByteBuf := make([]byte, 4) @@ -636,10 +639,15 @@ func (c *rawConnection) writerLoop() { close(hm.done) } if err != nil { - c.close(err) + c.internalClose(err) return } + case m := <-c.sendClose: + c.writeMessage(m) + close(m.done) + return // No message must be sent after the Close message. + case <-c.closed: return } @@ -801,24 +809,47 @@ func (c *rawConnection) shouldCompressMessage(msg message) bool { } } -func (c *rawConnection) close(err error) { +// Close is called when the connection is regularely closed and thus the Close +// BEP message is sent before terminating the actual connection. The error +// argument specifies the reason for closing the connection. +func (c *rawConnection) Close(err error) { c.once.Do(func() { - l.Debugln("close due to", err) - close(c.closed) + done := make(chan struct{}) + c.sendClose <- asyncMessage{&Close{err.Error()}, done} + <-done - c.awaitingMut.Lock() - for i, ch := range c.awaiting { - if ch != nil { - close(ch) - delete(c.awaiting, i) - } - } - c.awaitingMut.Unlock() - - c.receiver.Closed(c, err) + // No more sends are necessary, therefore closing the underlying + // connection can happen at the same time as the internal cleanup. + // And this prevents a potential deadlock due to calling c.receiver.Closed + go c.commonClose(err) }) } +// internalClose is called if there is an unexpected error during normal operation. +func (c *rawConnection) internalClose(err error) { + c.once.Do(func() { + c.commonClose(err) + }) +} + +// commonClose is a utility function that must only be called from within +// rawConnection.once.Do (i.e. in Close and close). +func (c *rawConnection) commonClose(err error) { + l.Debugln("close due to", err) + close(c.closed) + + c.awaitingMut.Lock() + for i, ch := range c.awaiting { + if ch != nil { + close(ch) + delete(c.awaiting, i) + } + } + c.awaitingMut.Unlock() + + c.receiver.Closed(c, err) +} + // The pingSender makes sure that we've sent a message within the last // PingSendInterval. If we already have something sent in the last // PingSendInterval/2, we do nothing. Otherwise we send a ping message. This @@ -859,7 +890,7 @@ func (c *rawConnection) pingReceiver() { d := time.Since(c.cr.Last()) if d > ReceiveTimeout { l.Debugln(c.id, "ping timeout", d) - c.close(ErrTimeout) + c.internalClose(ErrTimeout) } l.Debugln(c.id, "last read within", d) diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go index 2108a9733..b567b6270 100644 --- a/lib/protocol/protocol_test.go +++ b/lib/protocol/protocol_test.go @@ -56,7 +56,7 @@ func TestClose(t *testing.T) { c0.ClusterConfig(ClusterConfig{}) c1.ClusterConfig(ClusterConfig{}) - c0.close(errors.New("manual close")) + c0.internalClose(errors.New("manual close")) <-c0.closed if err := m0.closedError(); err == nil || !strings.Contains(err.Error(), "manual close") {