diff --git a/lib/db/backend/backend.go b/lib/db/backend/backend.go index 682e64dd7..e587c073b 100644 --- a/lib/db/backend/backend.go +++ b/lib/db/backend/backend.go @@ -150,16 +150,18 @@ func IsNotFound(err error) bool { // releaser manages counting on top of a waitgroup type releaser struct { - wg *sync.WaitGroup + wg *closeWaitGroup once *sync.Once } -func newReleaser(wg *sync.WaitGroup) *releaser { - wg.Add(1) +func newReleaser(wg *closeWaitGroup) (*releaser, error) { + if err := wg.Add(1); err != nil { + return nil, err + } return &releaser{ wg: wg, once: new(sync.Once), - } + }, nil } func (r releaser) Release() { @@ -169,3 +171,29 @@ func (r releaser) Release() { r.wg.Done() }) } + +// closeWaitGroup behaves just like a sync.WaitGroup, but does not require +// a single routine to do the Add and Wait calls. If Add is called after +// CloseWait, it will return an error, and both are safe to be used concurrently. +type closeWaitGroup struct { + sync.WaitGroup + closed bool + closeMut sync.RWMutex +} + +func (cg *closeWaitGroup) Add(i int) error { + cg.closeMut.RLock() + defer cg.closeMut.RUnlock() + if cg.closed { + return errClosed{} + } + cg.WaitGroup.Add(i) + return nil +} + +func (cg *closeWaitGroup) CloseWait() { + cg.closeMut.Lock() + cg.closed = true + cg.closeMut.Unlock() + cg.WaitGroup.Wait() +} diff --git a/lib/db/backend/leveldb_backend.go b/lib/db/backend/leveldb_backend.go index b714accda..f3748f430 100644 --- a/lib/db/backend/leveldb_backend.go +++ b/lib/db/backend/leveldb_backend.go @@ -7,8 +7,6 @@ package backend import ( - "sync" - "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/util" @@ -24,7 +22,14 @@ const ( // leveldbBackend implements Backend on top of a leveldb type leveldbBackend struct { ldb *leveldb.DB - closeWG sync.WaitGroup + closeWG *closeWaitGroup +} + +func newLeveldbBackend(ldb *leveldb.DB) *leveldbBackend { + return &leveldbBackend{ + ldb: ldb, + closeWG: &closeWaitGroup{}, + } } func (b *leveldbBackend) NewReadTransaction() (ReadTransaction, error) { @@ -36,9 +41,13 @@ func (b *leveldbBackend) newSnapshot() (leveldbSnapshot, error) { if err != nil { return leveldbSnapshot{}, wrapLeveldbErr(err) } + rel, err := newReleaser(b.closeWG) + if err != nil { + return leveldbSnapshot{}, err + } return leveldbSnapshot{ snap: snap, - rel: newReleaser(&b.closeWG), + rel: rel, }, nil } @@ -47,16 +56,20 @@ func (b *leveldbBackend) NewWriteTransaction() (WriteTransaction, error) { if err != nil { return nil, err // already wrapped } + rel, err := newReleaser(b.closeWG) + if err != nil { + return nil, err + } return &leveldbTransaction{ leveldbSnapshot: snap, ldb: b.ldb, batch: new(leveldb.Batch), - rel: newReleaser(&b.closeWG), + rel: rel, }, nil } func (b *leveldbBackend) Close() error { - b.closeWG.Wait() + b.closeWG.CloseWait() return wrapLeveldbErr(b.ldb.Close()) } @@ -82,6 +95,13 @@ func (b *leveldbBackend) Delete(key []byte) error { } func (b *leveldbBackend) Compact() error { + // Race is detected during testing when db is closed while compaction + // is ongoing. + err := b.closeWG.Add(1) + if err != nil { + return err + } + defer b.closeWG.Done() return wrapLeveldbErr(b.ldb.CompactRange(util.Range{})) } diff --git a/lib/db/backend/leveldb_open.go b/lib/db/backend/leveldb_open.go index 69186e108..8e981b37b 100644 --- a/lib/db/backend/leveldb_open.go +++ b/lib/db/backend/leveldb_open.go @@ -42,7 +42,7 @@ func OpenLevelDB(location string, tuning Tuning) (Backend, error) { if err != nil { return nil, err } - return &leveldbBackend{ldb: ldb}, nil + return newLeveldbBackend(ldb), nil } // OpenRO attempts to open the database at the given location, read only. @@ -55,13 +55,13 @@ func OpenLevelDBRO(location string) (Backend, error) { if err != nil { return nil, err } - return &leveldbBackend{ldb: ldb}, nil + return newLeveldbBackend(ldb), nil } // OpenMemory returns a new Backend referencing an in-memory database. func OpenLevelDBMemory() Backend { ldb, _ := leveldb.Open(storage.NewMemStorage(), nil) - return &leveldbBackend{ldb: ldb} + return newLeveldbBackend(ldb) } // optsFor returns the database options to use when opening a database with diff --git a/lib/db/blockmap_test.go b/lib/db/blockmap_test.go index 1717599a9..6c48a53ba 100644 --- a/lib/db/blockmap_test.go +++ b/lib/db/blockmap_test.go @@ -106,6 +106,7 @@ func discardFromBlockMap(db *Lowlevel, folder []byte, fs []protocol.FileInfo) er func TestBlockMapAddUpdateWipe(t *testing.T) { db, f := setup() + defer db.Close() if !dbEmpty(db) { t.Fatal("db not empty") @@ -193,6 +194,7 @@ func TestBlockMapAddUpdateWipe(t *testing.T) { func TestBlockFinderLookup(t *testing.T) { db, f := setup() + defer db.Close() folder1 := []byte("folder1") folder2 := []byte("folder2") diff --git a/lib/db/db_test.go b/lib/db/db_test.go index 37c929883..6d7f58ac8 100644 --- a/lib/db/db_test.go +++ b/lib/db/db_test.go @@ -33,6 +33,7 @@ func TestIgnoredFiles(t *testing.T) { t.Fatal(err) } db := NewLowlevel(ldb) + defer db.Close() if err := UpdateSchema(db); err != nil { t.Fatal(err) } @@ -161,6 +162,7 @@ func TestUpdate0to3(t *testing.T) { } db := NewLowlevel(ldb) + defer db.Close() updater := schemaUpdater{db} folder := []byte(update0to3Folder) @@ -173,6 +175,7 @@ func TestUpdate0to3(t *testing.T) { if err != nil { t.Fatal(err) } + defer trans.Release() if _, ok, err := trans.getFile(folder, protocol.LocalDeviceID[:], []byte(slashPrefixed)); err != nil { t.Fatal(err) } else if ok { @@ -196,6 +199,7 @@ func TestUpdate0to3(t *testing.T) { if err != nil { t.Fatal(err) } + defer trans.Release() _ = trans.withHaveSequence(folder, 0, func(fi FileIntf) bool { f := fi.(protocol.FileInfo) l.Infoln(f) @@ -227,6 +231,7 @@ func TestUpdate0to3(t *testing.T) { if err != nil { t.Fatal(err) } + defer trans.Release() _ = trans.withNeed(folder, protocol.LocalDeviceID[:], false, func(fi FileIntf) bool { e, ok := need[fi.FileName()] if !ok { @@ -246,6 +251,7 @@ func TestUpdate0to3(t *testing.T) { func TestDowngrade(t *testing.T) { db := NewLowlevel(backend.OpenMemory()) + defer db.Close() // sets the min version etc if err := UpdateSchema(db); err != nil { t.Fatal(err) diff --git a/lib/db/keyer_test.go b/lib/db/keyer_test.go index a3b002d06..2317e14ca 100644 --- a/lib/db/keyer_test.go +++ b/lib/db/keyer_test.go @@ -19,6 +19,7 @@ func TestDeviceKey(t *testing.T) { name := []byte("name") db := NewLowlevel(backend.OpenMemory()) + defer db.Close() key, err := db.keyer.GenerateDeviceFileKey(nil, fld, dev, name) if err != nil { @@ -50,6 +51,7 @@ func TestGlobalKey(t *testing.T) { name := []byte("name") db := NewLowlevel(backend.OpenMemory()) + defer db.Close() key, err := db.keyer.GenerateGlobalVersionKey(nil, fld, name) if err != nil { @@ -78,6 +80,7 @@ func TestSequenceKey(t *testing.T) { fld := []byte("folder6789012345678901234567890123456789012345678901234567890123") db := NewLowlevel(backend.OpenMemory()) + defer db.Close() const seq = 1234567890 key, err := db.keyer.GenerateSequenceKey(nil, fld, seq) diff --git a/lib/db/namespaced_test.go b/lib/db/namespaced_test.go index d4941be8c..409cc2b19 100644 --- a/lib/db/namespaced_test.go +++ b/lib/db/namespaced_test.go @@ -15,6 +15,7 @@ import ( func TestNamespacedInt(t *testing.T) { ldb := NewLowlevel(backend.OpenMemory()) + defer ldb.Close() n1 := NewNamespacedKV(ldb, "foo") n2 := NewNamespacedKV(ldb, "bar") @@ -62,6 +63,7 @@ func TestNamespacedInt(t *testing.T) { func TestNamespacedTime(t *testing.T) { ldb := NewLowlevel(backend.OpenMemory()) + defer ldb.Close() n1 := NewNamespacedKV(ldb, "foo") @@ -85,6 +87,7 @@ func TestNamespacedTime(t *testing.T) { func TestNamespacedString(t *testing.T) { ldb := NewLowlevel(backend.OpenMemory()) + defer ldb.Close() n1 := NewNamespacedKV(ldb, "foo") @@ -107,6 +110,7 @@ func TestNamespacedString(t *testing.T) { func TestNamespacedReset(t *testing.T) { ldb := NewLowlevel(backend.OpenMemory()) + defer ldb.Close() n1 := NewNamespacedKV(ldb, "foo") diff --git a/lib/db/set_test.go b/lib/db/set_test.go index 067b92894..2a45e19cf 100644 --- a/lib/db/set_test.go +++ b/lib/db/set_test.go @@ -129,6 +129,7 @@ func (l fileList) String() string { func TestGlobalSet(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() m := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -346,6 +347,7 @@ func TestGlobalSet(t *testing.T) { func TestNeedWithInvalid(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -383,6 +385,7 @@ func TestNeedWithInvalid(t *testing.T) { func TestUpdateToInvalid(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() folder := "test" s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -439,6 +442,7 @@ func TestUpdateToInvalid(t *testing.T) { func TestInvalidAvailability(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -480,6 +484,7 @@ func TestInvalidAvailability(t *testing.T) { func TestGlobalReset(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() m := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -518,6 +523,7 @@ func TestGlobalReset(t *testing.T) { func TestNeed(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() m := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -556,6 +562,7 @@ func TestNeed(t *testing.T) { func TestSequence(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() m := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -586,6 +593,7 @@ func TestSequence(t *testing.T) { func TestListDropFolder(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s0 := db.NewFileSet("test0", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) local1 := []protocol.FileInfo{ @@ -636,6 +644,7 @@ func TestListDropFolder(t *testing.T) { func TestGlobalNeedWithInvalid(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s := db.NewFileSet("test1", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -677,6 +686,7 @@ func TestGlobalNeedWithInvalid(t *testing.T) { func TestLongPath(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -736,6 +746,7 @@ func BenchmarkUpdateOneFile(b *testing.B) { func TestIndexID(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -831,6 +842,7 @@ func TestDropFiles(t *testing.T) { func TestIssue4701(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -872,6 +884,7 @@ func TestIssue4701(t *testing.T) { func TestWithHaveSequence(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() folder := "test" s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -909,6 +922,7 @@ func TestStressWithHaveSequence(t *testing.T) { } ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() folder := "test" s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -953,6 +967,7 @@ loop: func TestIssue4925(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() folder := "test" s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -979,6 +994,7 @@ func TestIssue4925(t *testing.T) { func TestMoveGlobalBack(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() folder := "test" file := "foo" @@ -1043,6 +1059,7 @@ func TestMoveGlobalBack(t *testing.T) { // https://github.com/syncthing/syncthing/issues/5007 func TestIssue5007(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() folder := "test" file := "foo" @@ -1070,6 +1087,7 @@ func TestIssue5007(t *testing.T) { // when the global file is deleted. func TestNeedDeleted(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() folder := "test" file := "foo" @@ -1104,6 +1122,7 @@ func TestNeedDeleted(t *testing.T) { func TestReceiveOnlyAccounting(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() folder := "test" s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -1208,6 +1227,7 @@ func TestReceiveOnlyAccounting(t *testing.T) { func TestNeedAfterUnignore(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() folder := "test" file := "foo" @@ -1240,6 +1260,7 @@ func TestRemoteInvalidNotAccounted(t *testing.T) { // Remote files with the invalid bit should not count. ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) files := []protocol.FileInfo{ @@ -1259,6 +1280,7 @@ func TestRemoteInvalidNotAccounted(t *testing.T) { func TestNeedWithNewerInvalid(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s := db.NewFileSet("default", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -1297,6 +1319,7 @@ func TestNeedWithNewerInvalid(t *testing.T) { func TestNeedAfterDeviceRemove(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() file := "foo" s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) @@ -1324,6 +1347,7 @@ func TestCaseSensitive(t *testing.T) { // Normal case sensitive lookup should work ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) local := []protocol.FileInfo{ @@ -1361,6 +1385,7 @@ func TestSequenceIndex(t *testing.T) { // Set up a db and a few files that we will manipulate. ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb) local := []protocol.FileInfo{ @@ -1454,6 +1479,7 @@ func TestSequenceIndex(t *testing.T) { func TestIgnoreAfterReceiveOnly(t *testing.T) { ldb := db.NewLowlevel(backend.OpenMemory()) + defer ldb.Close() file := "foo" s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)