From be0508cf264e67aab1056eb0810e535823b4d489 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 4 Dec 2019 10:46:55 +0100 Subject: [PATCH] lib/model, lib/protocol: Use error handling to avoid panic on non-started folder (fixes #6174) (#6212) This adds error returns to model methods called by the protocol layer. Returning an error will cause the connection to be torn down as the message couldn't be handled. Using this to signal that a folder isn't currently available will then cause a reconnection a few moments later, when it'll hopefully work better. Tested manually by running with STRECHECKDBEVERY=0 on a nontrivially sized setup. This panics reliably before this patch, but just causes a disconnect/reconnect now. --- lib/api/mocked_model_test.go | 20 ++++++++++++----- lib/model/model.go | 35 +++++++++++++++-------------- lib/protocol/benchmark_test.go | 12 ++++++---- lib/protocol/common_test.go | 12 ++++++---- lib/protocol/nativemodel_darwin.go | 8 +++---- lib/protocol/nativemodel_windows.go | 8 +++---- lib/protocol/protocol.go | 32 ++++++++++++++++---------- 7 files changed, 76 insertions(+), 51 deletions(-) diff --git a/lib/api/mocked_model_test.go b/lib/api/mocked_model_test.go index 16466a848..e967f0253 100644 --- a/lib/api/mocked_model_test.go +++ b/lib/api/mocked_model_test.go @@ -152,21 +152,29 @@ func (m *mockedModel) LocalChangedFiles(folder string, page, perpage int) []db.F return nil } -func (m *mockedModel) Serve() {} -func (m *mockedModel) Stop() {} -func (m *mockedModel) Index(deviceID protocol.DeviceID, folder string, files []protocol.FileInfo) {} -func (m *mockedModel) IndexUpdate(deviceID protocol.DeviceID, folder string, files []protocol.FileInfo) { +func (m *mockedModel) Serve() {} +func (m *mockedModel) Stop() {} + +func (m *mockedModel) Index(deviceID protocol.DeviceID, folder string, files []protocol.FileInfo) error { + return nil +} + +func (m *mockedModel) IndexUpdate(deviceID protocol.DeviceID, folder string, files []protocol.FileInfo) error { + return nil } func (m *mockedModel) Request(deviceID protocol.DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (protocol.RequestResponse, error) { return nil, nil } -func (m *mockedModel) ClusterConfig(deviceID protocol.DeviceID, config protocol.ClusterConfig) {} +func (m *mockedModel) ClusterConfig(deviceID protocol.DeviceID, config protocol.ClusterConfig) error { + return nil +} func (m *mockedModel) Closed(conn protocol.Connection, err error) {} -func (m *mockedModel) DownloadProgress(deviceID protocol.DeviceID, folder string, updates []protocol.FileDownloadProgressUpdate) { +func (m *mockedModel) DownloadProgress(deviceID protocol.DeviceID, folder string, updates []protocol.FileDownloadProgressUpdate) error { + return nil } func (m *mockedModel) AddConnection(conn connections.Connection, hello protocol.HelloResult) {} diff --git a/lib/model/model.go b/lib/model/model.go index ef21da24e..e635dc6c6 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1039,17 +1039,17 @@ func (m *model) RemoteNeedFolderFiles(device protocol.DeviceID, folder string, p // Index is called when a new device is connected and we receive their full index. // Implements the protocol.Model interface. -func (m *model) Index(deviceID protocol.DeviceID, folder string, fs []protocol.FileInfo) { - m.handleIndex(deviceID, folder, fs, false) +func (m *model) Index(deviceID protocol.DeviceID, folder string, fs []protocol.FileInfo) error { + return m.handleIndex(deviceID, folder, fs, false) } // IndexUpdate is called for incremental updates to connected devices' indexes. // Implements the protocol.Model interface. -func (m *model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []protocol.FileInfo) { - m.handleIndex(deviceID, folder, fs, true) +func (m *model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []protocol.FileInfo) error { + return m.handleIndex(deviceID, folder, fs, true) } -func (m *model) handleIndex(deviceID protocol.DeviceID, folder string, fs []protocol.FileInfo, update bool) { +func (m *model) handleIndex(deviceID protocol.DeviceID, folder string, fs []protocol.FileInfo, update bool) error { op := "Index" if update { op += " update" @@ -1059,10 +1059,10 @@ func (m *model) handleIndex(deviceID protocol.DeviceID, folder string, fs []prot if cfg, ok := m.cfg.Folder(folder); !ok || !cfg.SharedWith(deviceID) { l.Infof("%v for unexpected folder ID %q sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", op, folder, deviceID) - return + return errors.Wrap(errFolderMissing, folder) } else if cfg.Paused { l.Debugf("%v for paused folder (ID %q) sent from device %q.", op, folder, deviceID) - return + return errors.Wrap(ErrFolderPaused, folder) } m.fmut.RLock() @@ -1071,17 +1071,12 @@ func (m *model) handleIndex(deviceID protocol.DeviceID, folder string, fs []prot m.fmut.RUnlock() if !existing { - l.Warnf("%v for nonexistent folder %q", op, folder) - panic("handling index for nonexistent folder") + l.Infof("%v for nonexistent folder %q", op, folder) + return errors.Wrap(errFolderMissing, folder) } if running { defer runner.SchedulePull() - } else if update { - // Runner may legitimately not be set if this is the "cleanup" Index - // message at startup. - l.Warnf("%v for not running folder %q", op, folder) - panic("handling index for not running folder") } m.pmut.RLock() @@ -1105,9 +1100,11 @@ func (m *model) handleIndex(deviceID protocol.DeviceID, folder string, fs []prot "items": len(fs), "version": files.Sequence(deviceID), }) + + return nil } -func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterConfig) { +func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterConfig) error { // Check the peer device's announced folders against our own. Emits events // for folders that we don't expect (unknown or not shared). // Also, collect a list of folders we do share, and if he's interested in @@ -1305,6 +1302,8 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon l.Warnln("Failed to save config", err) } } + + return nil } // handleIntroductions handles adding devices/folders that are shared by an introducer device @@ -1961,13 +1960,13 @@ func (m *model) AddConnection(conn connections.Connection, hello protocol.HelloR m.deviceWasSeen(deviceID) } -func (m *model) DownloadProgress(device protocol.DeviceID, folder string, updates []protocol.FileDownloadProgressUpdate) { +func (m *model) DownloadProgress(device protocol.DeviceID, folder string, updates []protocol.FileDownloadProgressUpdate) error { m.fmut.RLock() cfg, ok := m.folderCfgs[folder] m.fmut.RUnlock() if !ok || cfg.DisableTempIndexes || !cfg.SharedWith(device) { - return + return nil } m.pmut.RLock() @@ -1981,6 +1980,8 @@ func (m *model) DownloadProgress(device protocol.DeviceID, folder string, update "folder": folder, "state": state, }) + + return nil } func (m *model) deviceWasSeen(deviceID protocol.DeviceID) { diff --git a/lib/protocol/benchmark_test.go b/lib/protocol/benchmark_test.go index 72b76f890..65fbd29c0 100644 --- a/lib/protocol/benchmark_test.go +++ b/lib/protocol/benchmark_test.go @@ -166,10 +166,12 @@ func negotiateTLS(cert tls.Certificate, conn0, conn1 net.Conn) (net.Conn, net.Co type fakeModel struct{} -func (m *fakeModel) Index(deviceID DeviceID, folder string, files []FileInfo) { +func (m *fakeModel) Index(deviceID DeviceID, folder string, files []FileInfo) error { + return nil } -func (m *fakeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) { +func (m *fakeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error { + return nil } func (m *fakeModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) { @@ -181,11 +183,13 @@ func (m *fakeModel) Request(deviceID DeviceID, folder, name string, size int32, return &fakeRequestResponse{buf}, nil } -func (m *fakeModel) ClusterConfig(deviceID DeviceID, config ClusterConfig) { +func (m *fakeModel) ClusterConfig(deviceID DeviceID, config ClusterConfig) error { + return nil } func (m *fakeModel) Closed(conn Connection, err error) { } -func (m *fakeModel) DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate) { +func (m *fakeModel) DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate) error { + return nil } diff --git a/lib/protocol/common_test.go b/lib/protocol/common_test.go index 1a15bcccf..a8b10555d 100644 --- a/lib/protocol/common_test.go +++ b/lib/protocol/common_test.go @@ -25,13 +25,15 @@ func newTestModel() *TestModel { } } -func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo) { +func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo) error { if t.indexFn != nil { t.indexFn(deviceID, folder, files) } + return nil } -func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) { +func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error { + return nil } func (t *TestModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) { @@ -52,13 +54,15 @@ func (t *TestModel) Closed(conn Connection, err error) { close(t.closedCh) } -func (t *TestModel) ClusterConfig(deviceID DeviceID, config ClusterConfig) { +func (t *TestModel) ClusterConfig(deviceID DeviceID, config ClusterConfig) error { if t.ccFn != nil { t.ccFn(deviceID, config) } + return nil } -func (t *TestModel) DownloadProgress(DeviceID, string, []FileDownloadProgressUpdate) { +func (t *TestModel) DownloadProgress(DeviceID, string, []FileDownloadProgressUpdate) error { + return nil } func (t *TestModel) closedError() error { diff --git a/lib/protocol/nativemodel_darwin.go b/lib/protocol/nativemodel_darwin.go index b4a20fe0b..17a50aced 100644 --- a/lib/protocol/nativemodel_darwin.go +++ b/lib/protocol/nativemodel_darwin.go @@ -12,18 +12,18 @@ type nativeModel struct { Model } -func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) { +func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) error { for i := range files { files[i].Name = norm.NFD.String(files[i].Name) } - m.Model.Index(deviceID, folder, files) + return m.Model.Index(deviceID, folder, files) } -func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) { +func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error { for i := range files { files[i].Name = norm.NFD.String(files[i].Name) } - m.Model.IndexUpdate(deviceID, folder, files) + return m.Model.IndexUpdate(deviceID, folder, files) } func (m nativeModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) { diff --git a/lib/protocol/nativemodel_windows.go b/lib/protocol/nativemodel_windows.go index f3076c961..b5159ebda 100644 --- a/lib/protocol/nativemodel_windows.go +++ b/lib/protocol/nativemodel_windows.go @@ -15,14 +15,14 @@ type nativeModel struct { Model } -func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) { +func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) error { files = fixupFiles(files) - m.Model.Index(deviceID, folder, files) + return m.Model.Index(deviceID, folder, files) } -func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) { +func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error { files = fixupFiles(files) - m.Model.IndexUpdate(deviceID, folder, files) + return m.Model.IndexUpdate(deviceID, folder, files) } func (m nativeModel) Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) { diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index 1f7e9a071..ea271df05 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -111,17 +111,17 @@ var ( type Model interface { // An index was received from the peer device - Index(deviceID DeviceID, folder string, files []FileInfo) + Index(deviceID DeviceID, folder string, files []FileInfo) error // An index update was received from the peer device - IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) + IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error // A request was made by the peer device Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) // A cluster configuration message was received - ClusterConfig(deviceID DeviceID, config ClusterConfig) + ClusterConfig(deviceID DeviceID, config ClusterConfig) error // The peer device closed the connection Closed(conn Connection, err error) // The peer device sent progress updates for the files it is currently downloading - DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate) + DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate) error } type RequestResponse interface { @@ -388,7 +388,9 @@ func (c *rawConnection) dispatcherLoop() (err error) { if state != stateInitial { return fmt.Errorf("protocol error: cluster config message in state %d", state) } - c.receiver.ClusterConfig(c.id, *msg) + if err := c.receiver.ClusterConfig(c.id, *msg); err != nil { + return errors.Wrap(err, "receiver error") + } state = stateReady case *Index: @@ -399,7 +401,9 @@ func (c *rawConnection) dispatcherLoop() (err error) { if err := checkIndexConsistency(msg.Files); err != nil { return errors.Wrap(err, "protocol error: index") } - c.handleIndex(*msg) + if err := c.handleIndex(*msg); err != nil { + return errors.Wrap(err, "receiver error") + } state = stateReady case *IndexUpdate: @@ -410,7 +414,9 @@ func (c *rawConnection) dispatcherLoop() (err error) { if err := checkIndexConsistency(msg.Files); err != nil { return errors.Wrap(err, "protocol error: index update") } - c.handleIndexUpdate(*msg) + if err := c.handleIndexUpdate(*msg); err != nil { + return errors.Wrap(err, "receiver error") + } state = stateReady case *Request: @@ -435,7 +441,9 @@ func (c *rawConnection) dispatcherLoop() (err error) { if state != stateReady { return fmt.Errorf("protocol error: response message in state %d", state) } - c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates) + if err := c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates); err != nil { + return errors.Wrap(err, "receiver error") + } case *Ping: l.Debugln("read Ping message") @@ -543,14 +551,14 @@ func (c *rawConnection) readHeader(fourByteBuf []byte) (Header, error) { return hdr, nil } -func (c *rawConnection) handleIndex(im Index) { +func (c *rawConnection) handleIndex(im Index) error { l.Debugf("Index(%v, %v, %d file)", c.id, im.Folder, len(im.Files)) - c.receiver.Index(c.id, im.Folder, im.Files) + return c.receiver.Index(c.id, im.Folder, im.Files) } -func (c *rawConnection) handleIndexUpdate(im IndexUpdate) { +func (c *rawConnection) handleIndexUpdate(im IndexUpdate) error { l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Folder, len(im.Files)) - c.receiver.IndexUpdate(c.id, im.Folder, im.Files) + return c.receiver.IndexUpdate(c.id, im.Folder, im.Files) } // checkIndexConsistency verifies a number of invariants on FileInfos received in