From 0ebee92f7df3e2a8924ec9d395cae63d361aaa27 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 7 Oct 2014 16:03:17 +0200 Subject: [PATCH] Test case and goleveldb fix (fixes #740, fixes #796) --- Godeps/Godeps.json | 2 +- .../syndtr/goleveldb/leveldb/db_snapshot.go | 2 +- .../syndtr/goleveldb/leveldb/db_test.go | 138 ++++++++++++++ internal/files/concurrency_test.go | 168 ++++++++++++++++++ internal/files/set_test.go | 61 ------- 5 files changed, 308 insertions(+), 63 deletions(-) create mode 100644 internal/files/concurrency_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 316f71d6f..45f6224c7 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -48,7 +48,7 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "e2fa4e6ac1cc41a73bc9fd467878ecbf65df5cc3" + "Rev": "0d8857b7ec571b0a6c9677d8e6c0a4ceeabd1d71" }, { "ImportPath": "github.com/vitrun/qart/coding", diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go index fb1ce85b9..e8f50e3b0 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go @@ -71,7 +71,7 @@ func (db *DB) releaseSnapshot(elem *snapshotElement) { func (db *DB) minSeq() uint64 { db.snapsMu.Lock() defer db.snapsMu.Unlock() - elem := db.snapsRoot.prev + elem := db.snapsRoot.next if elem != &db.snapsRoot { return elem.seq } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go index b71906813..88da205ff 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go @@ -1125,6 +1125,45 @@ func TestDb_Snapshot(t *testing.T) { }) } +func TestDb_SnapshotList(t *testing.T) { + db := &DB{} + db.initSnapshot() + e0a := db.acquireSnapshot() + e0b := db.acquireSnapshot() + db.seq = 1 + e1 := db.acquireSnapshot() + db.seq = 2 + e2 := db.acquireSnapshot() + + if db.minSeq() != 0 { + t.Fatalf("invalid sequence number, got=%d", db.minSeq()) + } + db.releaseSnapshot(e0a) + if db.minSeq() != 0 { + t.Fatalf("invalid sequence number, got=%d", db.minSeq()) + } + db.releaseSnapshot(e2) + if db.minSeq() != 0 { + t.Fatalf("invalid sequence number, got=%d", db.minSeq()) + } + db.releaseSnapshot(e0b) + if db.minSeq() != 1 { + t.Fatalf("invalid sequence number, got=%d", db.minSeq()) + } + e2 = db.acquireSnapshot() + if db.minSeq() != 1 { + t.Fatalf("invalid sequence number, got=%d", db.minSeq()) + } + db.releaseSnapshot(e1) + if db.minSeq() != 2 { + t.Fatalf("invalid sequence number, got=%d", db.minSeq()) + } + db.releaseSnapshot(e2) + if db.minSeq() != 2 { + t.Fatalf("invalid sequence number, got=%d", db.minSeq()) + } +} + func TestDb_HiddenValuesAreRemoved(t *testing.T) { trun(t, func(h *dbHarness) { s := h.db.s @@ -1884,3 +1923,102 @@ func TestDb_LeveldbIssue200(t *testing.T) { iter.Next() assertBytes(t, []byte("5"), iter.Key()) } + +func TestDb_GoleveldbIssue74(t *testing.T) { + h := newDbHarnessWopt(t, &opt.Options{ + WriteBuffer: 1 * opt.MiB, + }) + defer h.close() + + const n, dur = 10000, 5 * time.Second + + runtime.GOMAXPROCS(runtime.NumCPU()) + + until := time.Now().Add(dur) + wg := new(sync.WaitGroup) + wg.Add(2) + var done uint32 + go func() { + var i int + defer func() { + t.Logf("WRITER DONE #%d", i) + atomic.StoreUint32(&done, 1) + wg.Done() + }() + + b := new(Batch) + for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { + iv := fmt.Sprintf("VAL%010d", i) + for k := 0; k < n; k++ { + key := fmt.Sprintf("KEY%06d", k) + b.Put([]byte(key), []byte(key+iv)) + b.Put([]byte(fmt.Sprintf("PTR%06d", k)), []byte(key)) + } + h.write(b) + + b.Reset() + snap := h.getSnapshot() + iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil) + var k int + for ; iter.Next(); k++ { + ptrKey := iter.Key() + key := iter.Value() + + if _, err := snap.Get(ptrKey, nil); err != nil { + t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err) + } + if value, err := snap.Get(key, nil); err != nil { + t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, key, err) + } else if string(value) != string(key)+iv { + t.Fatalf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value) + } + + b.Delete(key) + b.Delete(ptrKey) + } + h.write(b) + iter.Release() + snap.Release() + if k != n { + t.Fatalf("#%d %d != %d", i, k, n) + } + } + t.Logf("writer done after %d iterations", i) + }() + go func() { + var i int + defer func() { + t.Logf("READER DONE #%d", i) + atomic.StoreUint32(&done, 1) + wg.Done() + }() + for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { + snap := h.getSnapshot() + iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil) + var prevValue string + var k int + for ; iter.Next(); k++ { + ptrKey := iter.Key() + key := iter.Value() + + if _, err := snap.Get(ptrKey, nil); err != nil { + t.Fatalf("READER #%d snapshot.Get %q: %v", i, ptrKey, err) + } + + if value, err := snap.Get(key, nil); err != nil { + t.Fatalf("READER #%d snapshot.Get %q: %v", i, key, err) + } else if prevValue != "" && string(value) != string(key)+prevValue { + t.Fatalf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value) + } else { + prevValue = string(value[len(key):]) + } + } + iter.Release() + snap.Release() + if k > 0 && k != n { + t.Fatalf("#%d %d != %d", i, k, n) + } + } + }() + wg.Wait() +} diff --git a/internal/files/concurrency_test.go b/internal/files/concurrency_test.go new file mode 100644 index 000000000..be3f8afc2 --- /dev/null +++ b/internal/files/concurrency_test.go @@ -0,0 +1,168 @@ +package files_test + +import ( + "crypto/rand" + "fmt" + "log" + "os" + "sync" + "testing" + "time" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/util" +) + +var items map[string][]byte +var keys map[string]string + +const nItems = 10000 + +func setupMaps() { + + // Set up two simple maps, one "key" => data and one "indirect key" => + // "key". + + items = make(map[string][]byte, nItems) + keys = make(map[string]string, nItems) + + for i := 0; i < nItems; i++ { + k1 := fmt.Sprintf("key%d", i) + data := make([]byte, 87) + _, err := rand.Reader.Read(data) + if err != nil { + panic(err) + } + items[k1] = data + + k2 := fmt.Sprintf("indirect%d", i) + keys[k2] = k1 + } +} + +func makeK1(s string) []byte { + k1 := make([]byte, 1+len(s)) + k1[0] = 1 + copy(k1[1:], []byte(s)) + return k1 +} + +func makeK2(s string) []byte { + k2 := make([]byte, 1+len(s)) + k2[0] = 2 // Only difference from makeK1 + copy(k2[1:], []byte(s)) + return k2 +} + +func setItems(db *leveldb.DB) error { + snap, err := db.GetSnapshot() + if err != nil { + return err + } + defer snap.Release() + + batch := &leveldb.Batch{} + for k2, k1 := range keys { + // Create k1 => item mapping first + batch.Put(makeK1(k1), items[k1]) + // Then the k2 => k1 mapping + batch.Put(makeK2(k2), makeK1(k1)) + } + return db.Write(batch, nil) +} + +func clearItems(db *leveldb.DB) error { + snap, err := db.GetSnapshot() + if err != nil { + return err + } + defer snap.Release() + + // Iterate from the start of k2 space to the end + it := snap.NewIterator(&util.Range{Start: []byte{2}, Limit: []byte{2, 0xff, 0xff, 0xff, 0xff}}, nil) + defer it.Release() + + batch := &leveldb.Batch{} + for it.Next() { + k2 := it.Key() + k1 := it.Value() + + // k1 should exist + _, err := snap.Get(k1, nil) + if err != nil { + return err + } + + // Delete the k2 => k1 mapping first + batch.Delete(k2) + // Then the k1 => key mapping + batch.Delete(k1) + } + return db.Write(batch, nil) +} + +func scanItems(db *leveldb.DB) error { + snap, err := db.GetSnapshot() + if err != nil { + return err + } + defer snap.Release() + + // Iterate from the start of k2 space to the end + it := snap.NewIterator(&util.Range{Start: []byte{2}, Limit: []byte{2, 0xff, 0xff, 0xff, 0xff}}, nil) + defer it.Release() + + for it.Next() { + // k2 => k1 => data + k2 := it.Key() + k1 := it.Value() + _, err := snap.Get(k1, nil) + if err != nil { + log.Printf("k1: %q (%x)", k1, k1) + log.Printf("k2: %q (%x)", k2, k2) + return err + } + } + return nil +} + +func TestConcurrent(t *testing.T) { + setupMaps() + + dur := 2 * time.Second + t0 := time.Now() + var wg sync.WaitGroup + + os.RemoveAll("testdata/global.db") + db, err := leveldb.OpenFile("testdata/global.db", nil) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll("testdata/global.db") + + wg.Add(1) + go func() { + defer wg.Done() + for time.Since(t0) < dur { + if err := setItems(db); err != nil { + t.Fatal(err) + } + if err := clearItems(db); err != nil { + t.Fatal(err) + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for time.Since(t0) < dur { + if err := scanItems(db); err != nil { + t.Fatal(err) + } + } + }() + + wg.Wait() + db.Close() +} diff --git a/internal/files/set_test.go b/internal/files/set_test.go index 70e853e8c..6dfdb0eeb 100644 --- a/internal/files/set_test.go +++ b/internal/files/set_test.go @@ -862,64 +862,3 @@ func TestLongPath(t *testing.T) { gf[0].Name, local[0].Name) } } - -/* -var gf protocol.FileInfo - -func TestStressGlobalVersion(t *testing.T) { - dur := 15 * time.Second - if testing.Short() { - dur = 1 * time.Second - } - - set1 := []protocol.FileInfo{ - protocol.FileInfo{Name: "a", Version: 1000}, - protocol.FileInfo{Name: "b", Version: 1000}, - } - set2 := []protocol.FileInfo{ - protocol.FileInfo{Name: "b", Version: 1001}, - protocol.FileInfo{Name: "c", Version: 1000}, - } - - db, err := leveldb.OpenFile("testdata/global.db", nil) - if err != nil { - t.Fatal(err) - } - - m := files.NewSet("test", db) - - done := make(chan struct{}) - go stressWriter(m, remoteDevice0, set1, nil, done) - go stressWriter(m, protocol.LocalDeviceID, set2, nil, done) - - t0 := time.Now() - for time.Since(t0) < dur { - m.WithGlobal(func(f protocol.FileInfo) bool { - gf = f - return true - }) - } - - close(done) -} - -func stressWriter(s *files.Set, id protocol.DeviceID, set1, set2 []protocol.FileInfo, done chan struct{}) { - one := true - i := 0 - for { - select { - case <-done: - return - - default: - if one { - s.Replace(id, set1) - } else { - s.Replace(id, set2) - } - one = !one - } - i++ - } -} -*/