lib/db: Add closeWaitGroup to allow async operation (#6317)

This commit is contained in:
Simon Frei 2020-02-11 14:31:43 +01:00 committed by GitHub
parent b61da487e4
commit 29736b1e33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 102 additions and 13 deletions

View File

@ -150,16 +150,18 @@ func IsNotFound(err error) bool {
// releaser manages counting on top of a waitgroup
type releaser struct {
wg *sync.WaitGroup
wg *closeWaitGroup
once *sync.Once
}
func newReleaser(wg *sync.WaitGroup) *releaser {
wg.Add(1)
func newReleaser(wg *closeWaitGroup) (*releaser, error) {
if err := wg.Add(1); err != nil {
return nil, err
}
return &releaser{
wg: wg,
once: new(sync.Once),
}
}, nil
}
func (r releaser) Release() {
@ -169,3 +171,29 @@ func (r releaser) Release() {
r.wg.Done()
})
}
// closeWaitGroup behaves just like a sync.WaitGroup, but does not require
// a single routine to do the Add and Wait calls. If Add is called after
// CloseWait, it will return an error, and both are safe to be used concurrently.
type closeWaitGroup struct {
sync.WaitGroup
closed bool
closeMut sync.RWMutex
}
func (cg *closeWaitGroup) Add(i int) error {
cg.closeMut.RLock()
defer cg.closeMut.RUnlock()
if cg.closed {
return errClosed{}
}
cg.WaitGroup.Add(i)
return nil
}
func (cg *closeWaitGroup) CloseWait() {
cg.closeMut.Lock()
cg.closed = true
cg.closeMut.Unlock()
cg.WaitGroup.Wait()
}

View File

@ -7,8 +7,6 @@
package backend
import (
"sync"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
@ -24,7 +22,14 @@ const (
// leveldbBackend implements Backend on top of a leveldb
type leveldbBackend struct {
ldb *leveldb.DB
closeWG sync.WaitGroup
closeWG *closeWaitGroup
}
func newLeveldbBackend(ldb *leveldb.DB) *leveldbBackend {
return &leveldbBackend{
ldb: ldb,
closeWG: &closeWaitGroup{},
}
}
func (b *leveldbBackend) NewReadTransaction() (ReadTransaction, error) {
@ -36,9 +41,13 @@ func (b *leveldbBackend) newSnapshot() (leveldbSnapshot, error) {
if err != nil {
return leveldbSnapshot{}, wrapLeveldbErr(err)
}
rel, err := newReleaser(b.closeWG)
if err != nil {
return leveldbSnapshot{}, err
}
return leveldbSnapshot{
snap: snap,
rel: newReleaser(&b.closeWG),
rel: rel,
}, nil
}
@ -47,16 +56,20 @@ func (b *leveldbBackend) NewWriteTransaction() (WriteTransaction, error) {
if err != nil {
return nil, err // already wrapped
}
rel, err := newReleaser(b.closeWG)
if err != nil {
return nil, err
}
return &leveldbTransaction{
leveldbSnapshot: snap,
ldb: b.ldb,
batch: new(leveldb.Batch),
rel: newReleaser(&b.closeWG),
rel: rel,
}, nil
}
func (b *leveldbBackend) Close() error {
b.closeWG.Wait()
b.closeWG.CloseWait()
return wrapLeveldbErr(b.ldb.Close())
}
@ -82,6 +95,13 @@ func (b *leveldbBackend) Delete(key []byte) error {
}
func (b *leveldbBackend) Compact() error {
// Race is detected during testing when db is closed while compaction
// is ongoing.
err := b.closeWG.Add(1)
if err != nil {
return err
}
defer b.closeWG.Done()
return wrapLeveldbErr(b.ldb.CompactRange(util.Range{}))
}

View File

@ -42,7 +42,7 @@ func OpenLevelDB(location string, tuning Tuning) (Backend, error) {
if err != nil {
return nil, err
}
return &leveldbBackend{ldb: ldb}, nil
return newLeveldbBackend(ldb), nil
}
// OpenRO attempts to open the database at the given location, read only.
@ -55,13 +55,13 @@ func OpenLevelDBRO(location string) (Backend, error) {
if err != nil {
return nil, err
}
return &leveldbBackend{ldb: ldb}, nil
return newLeveldbBackend(ldb), nil
}
// OpenMemory returns a new Backend referencing an in-memory database.
func OpenLevelDBMemory() Backend {
ldb, _ := leveldb.Open(storage.NewMemStorage(), nil)
return &leveldbBackend{ldb: ldb}
return newLeveldbBackend(ldb)
}
// optsFor returns the database options to use when opening a database with

View File

@ -106,6 +106,7 @@ func discardFromBlockMap(db *Lowlevel, folder []byte, fs []protocol.FileInfo) er
func TestBlockMapAddUpdateWipe(t *testing.T) {
db, f := setup()
defer db.Close()
if !dbEmpty(db) {
t.Fatal("db not empty")
@ -193,6 +194,7 @@ func TestBlockMapAddUpdateWipe(t *testing.T) {
func TestBlockFinderLookup(t *testing.T) {
db, f := setup()
defer db.Close()
folder1 := []byte("folder1")
folder2 := []byte("folder2")

View File

@ -33,6 +33,7 @@ func TestIgnoredFiles(t *testing.T) {
t.Fatal(err)
}
db := NewLowlevel(ldb)
defer db.Close()
if err := UpdateSchema(db); err != nil {
t.Fatal(err)
}
@ -161,6 +162,7 @@ func TestUpdate0to3(t *testing.T) {
}
db := NewLowlevel(ldb)
defer db.Close()
updater := schemaUpdater{db}
folder := []byte(update0to3Folder)
@ -173,6 +175,7 @@ func TestUpdate0to3(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer trans.Release()
if _, ok, err := trans.getFile(folder, protocol.LocalDeviceID[:], []byte(slashPrefixed)); err != nil {
t.Fatal(err)
} else if ok {
@ -196,6 +199,7 @@ func TestUpdate0to3(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer trans.Release()
_ = trans.withHaveSequence(folder, 0, func(fi FileIntf) bool {
f := fi.(protocol.FileInfo)
l.Infoln(f)
@ -227,6 +231,7 @@ func TestUpdate0to3(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer trans.Release()
_ = trans.withNeed(folder, protocol.LocalDeviceID[:], false, func(fi FileIntf) bool {
e, ok := need[fi.FileName()]
if !ok {
@ -246,6 +251,7 @@ func TestUpdate0to3(t *testing.T) {
func TestDowngrade(t *testing.T) {
db := NewLowlevel(backend.OpenMemory())
defer db.Close()
// sets the min version etc
if err := UpdateSchema(db); err != nil {
t.Fatal(err)

View File

@ -19,6 +19,7 @@ func TestDeviceKey(t *testing.T) {
name := []byte("name")
db := NewLowlevel(backend.OpenMemory())
defer db.Close()
key, err := db.keyer.GenerateDeviceFileKey(nil, fld, dev, name)
if err != nil {
@ -50,6 +51,7 @@ func TestGlobalKey(t *testing.T) {
name := []byte("name")
db := NewLowlevel(backend.OpenMemory())
defer db.Close()
key, err := db.keyer.GenerateGlobalVersionKey(nil, fld, name)
if err != nil {
@ -78,6 +80,7 @@ func TestSequenceKey(t *testing.T) {
fld := []byte("folder6789012345678901234567890123456789012345678901234567890123")
db := NewLowlevel(backend.OpenMemory())
defer db.Close()
const seq = 1234567890
key, err := db.keyer.GenerateSequenceKey(nil, fld, seq)

View File

@ -15,6 +15,7 @@ import (
func TestNamespacedInt(t *testing.T) {
ldb := NewLowlevel(backend.OpenMemory())
defer ldb.Close()
n1 := NewNamespacedKV(ldb, "foo")
n2 := NewNamespacedKV(ldb, "bar")
@ -62,6 +63,7 @@ func TestNamespacedInt(t *testing.T) {
func TestNamespacedTime(t *testing.T) {
ldb := NewLowlevel(backend.OpenMemory())
defer ldb.Close()
n1 := NewNamespacedKV(ldb, "foo")
@ -85,6 +87,7 @@ func TestNamespacedTime(t *testing.T) {
func TestNamespacedString(t *testing.T) {
ldb := NewLowlevel(backend.OpenMemory())
defer ldb.Close()
n1 := NewNamespacedKV(ldb, "foo")
@ -107,6 +110,7 @@ func TestNamespacedString(t *testing.T) {
func TestNamespacedReset(t *testing.T) {
ldb := NewLowlevel(backend.OpenMemory())
defer ldb.Close()
n1 := NewNamespacedKV(ldb, "foo")

View File

@ -129,6 +129,7 @@ func (l fileList) String() string {
func TestGlobalSet(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
m := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -346,6 +347,7 @@ func TestGlobalSet(t *testing.T) {
func TestNeedWithInvalid(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -383,6 +385,7 @@ func TestNeedWithInvalid(t *testing.T) {
func TestUpdateToInvalid(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
folder := "test"
s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -439,6 +442,7 @@ func TestUpdateToInvalid(t *testing.T) {
func TestInvalidAvailability(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -480,6 +484,7 @@ func TestInvalidAvailability(t *testing.T) {
func TestGlobalReset(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
m := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -518,6 +523,7 @@ func TestGlobalReset(t *testing.T) {
func TestNeed(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
m := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -556,6 +562,7 @@ func TestNeed(t *testing.T) {
func TestSequence(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
m := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -586,6 +593,7 @@ func TestSequence(t *testing.T) {
func TestListDropFolder(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s0 := db.NewFileSet("test0", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
local1 := []protocol.FileInfo{
@ -636,6 +644,7 @@ func TestListDropFolder(t *testing.T) {
func TestGlobalNeedWithInvalid(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s := db.NewFileSet("test1", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -677,6 +686,7 @@ func TestGlobalNeedWithInvalid(t *testing.T) {
func TestLongPath(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -736,6 +746,7 @@ func BenchmarkUpdateOneFile(b *testing.B) {
func TestIndexID(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -831,6 +842,7 @@ func TestDropFiles(t *testing.T) {
func TestIssue4701(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -872,6 +884,7 @@ func TestIssue4701(t *testing.T) {
func TestWithHaveSequence(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
folder := "test"
s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -909,6 +922,7 @@ func TestStressWithHaveSequence(t *testing.T) {
}
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
folder := "test"
s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -953,6 +967,7 @@ loop:
func TestIssue4925(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
folder := "test"
s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -979,6 +994,7 @@ func TestIssue4925(t *testing.T) {
func TestMoveGlobalBack(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
folder := "test"
file := "foo"
@ -1043,6 +1059,7 @@ func TestMoveGlobalBack(t *testing.T) {
// https://github.com/syncthing/syncthing/issues/5007
func TestIssue5007(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
folder := "test"
file := "foo"
@ -1070,6 +1087,7 @@ func TestIssue5007(t *testing.T) {
// when the global file is deleted.
func TestNeedDeleted(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
folder := "test"
file := "foo"
@ -1104,6 +1122,7 @@ func TestNeedDeleted(t *testing.T) {
func TestReceiveOnlyAccounting(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
folder := "test"
s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -1208,6 +1227,7 @@ func TestReceiveOnlyAccounting(t *testing.T) {
func TestNeedAfterUnignore(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
folder := "test"
file := "foo"
@ -1240,6 +1260,7 @@ func TestRemoteInvalidNotAccounted(t *testing.T) {
// Remote files with the invalid bit should not count.
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
files := []protocol.FileInfo{
@ -1259,6 +1280,7 @@ func TestRemoteInvalidNotAccounted(t *testing.T) {
func TestNeedWithNewerInvalid(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s := db.NewFileSet("default", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -1297,6 +1319,7 @@ func TestNeedWithNewerInvalid(t *testing.T) {
func TestNeedAfterDeviceRemove(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
file := "foo"
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
@ -1324,6 +1347,7 @@ func TestCaseSensitive(t *testing.T) {
// Normal case sensitive lookup should work
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
local := []protocol.FileInfo{
@ -1361,6 +1385,7 @@ func TestSequenceIndex(t *testing.T) {
// Set up a db and a few files that we will manipulate.
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
local := []protocol.FileInfo{
@ -1454,6 +1479,7 @@ func TestSequenceIndex(t *testing.T) {
func TestIgnoreAfterReceiveOnly(t *testing.T) {
ldb := db.NewLowlevel(backend.OpenMemory())
defer ldb.Close()
file := "foo"
s := db.NewFileSet("test", fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)