diff --git a/internal/files/.gitignore b/internal/files/.gitignore new file mode 100644 index 000000000..54f299079 --- /dev/null +++ b/internal/files/.gitignore @@ -0,0 +1 @@ +testdata/*.db diff --git a/internal/files/concurrency2_test.go b/internal/files/concurrency2_test.go deleted file mode 100644 index d74a90265..000000000 --- a/internal/files/concurrency2_test.go +++ /dev/null @@ -1,206 +0,0 @@ -// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). -// -// This program is free software: you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free -// Software Foundation, either version 3 of the License, or (at your option) -// any later version. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -// more details. -// -// You should have received a copy of the GNU General Public License along -// with this program. If not, see . - -package files_test - -import ( - "fmt" - "log" - "math/rand" - "os" - "runtime" - "sync" - "testing" - "time" - - "github.com/syncthing/syncthing/internal/files" - "github.com/syncthing/syncthing/internal/protocol" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" -) - -func TestLongConcurrent(t *testing.T) { - if testing.Short() || runtime.GOMAXPROCS(-1) < 4 { - return - } - - os.RemoveAll("/tmp/test.db") - db, err := leveldb.OpenFile("/tmp/test.db", &opt.Options{CachedOpenFiles: 100}) - if err != nil { - t.Fatal(err) - } - - start := make(chan struct{}) - - log.Println("preparing") - var wg sync.WaitGroup - for i := 0; i < runtime.GOMAXPROCS(-1); i++ { - i := i - rem0, rem1 := generateFiles() - wg.Add(1) - go func() { - defer wg.Done() - longConcurrentTest(db, fmt.Sprintf("folder%d", i), rem0, rem1, start) - }() - } - - log.Println("starting") - close(start) - wg.Wait() -} - -func generateFiles() ([]protocol.FileInfo, []protocol.FileInfo) { - var rem0, rem1 fileList - - for i := 0; i < 10000; i++ { - n := rand.Int() - rem0 = append(rem0, protocol.FileInfo{ - Name: fmt.Sprintf("path/path/path/path/path/path/path%d/path%d/path%d/file%d", n, n, n, n), - Version: uint64(rand.Int63()), - Blocks: genBlocks(rand.Intn(25)), - Flags: uint32(rand.Int31()), - }) - } - - for i := 0; i < 10000; i++ { - if i%2 == 0 { - // Same file as rem0, randomly newer or older - f := rem0[i] - f.Version = uint64(rand.Int63()) - rem1 = append(rem1, f) - } else { - // Different file - n := rand.Int() - f := protocol.FileInfo{ - Name: fmt.Sprintf("path/path/path/path/path/path/path%d/path%d/path%d/file%d", n, n, n, n), - Version: uint64(rand.Int63()), - Blocks: genBlocks(rand.Intn(25)), - Flags: uint32(rand.Int31()), - } - rem1 = append(rem1, f) - } - } - - return rem0, rem1 -} - -func longConcurrentTest(db *leveldb.DB, folder string, rem0, rem1 []protocol.FileInfo, start chan struct{}) { - s := files.NewSet(folder, db) - - <-start - - t0 := time.Now() - cont := func() bool { - return time.Since(t0) < 60*time.Second - } - - log.Println(folder, "start") - - var wg sync.WaitGroup - - // Fast updater - - wg.Add(1) - go func() { - defer wg.Done() - for cont() { - log.Println(folder, "u0") - for i := 0; i < 10000; i += 250 { - s.Update(remoteDevice0, rem0[i:i+250]) - } - time.Sleep(25 * time.Millisecond) - s.Replace(remoteDevice0, nil) - time.Sleep(25 * time.Millisecond) - } - }() - - // Fast updater - - wg.Add(1) - go func() { - defer wg.Done() - for cont() { - log.Println(folder, "u1") - for i := 0; i < 10000; i += 250 { - s.Update(remoteDevice1, rem1[i:i+250]) - } - time.Sleep(25 * time.Millisecond) - s.Replace(remoteDevice1, nil) - time.Sleep(25 * time.Millisecond) - } - }() - - // Fast need list - - wg.Add(1) - go func() { - defer wg.Done() - for cont() { - needList(s, protocol.LocalDeviceID) - time.Sleep(25 * time.Millisecond) - } - }() - - // Fast global list - - wg.Add(1) - go func() { - defer wg.Done() - for cont() { - globalList(s) - time.Sleep(25 * time.Millisecond) - } - }() - - // Long running need lists - - go func() { - for i := 0; i < 10; i++ { - time.Sleep(25 * time.Millisecond) - wg.Add(1) - go func() { - defer wg.Done() - for cont() { - s.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool { - time.Sleep(50 * time.Millisecond) - return cont() - }) - } - }() - } - }() - - // Long running global lists - - go func() { - for i := 0; i < 10; i++ { - time.Sleep(25 * time.Millisecond) - wg.Add(1) - go func() { - defer wg.Done() - for cont() { - s.WithGlobal(func(intf protocol.FileIntf) bool { - time.Sleep(50 * time.Millisecond) - return cont() - }) - } - }() - } - }() - - wg.Wait() - - log.Println(folder, "done") -} diff --git a/internal/files/concurrency_test.go b/internal/files/concurrency_test.go index be3f8afc2..fb815c16d 100644 --- a/internal/files/concurrency_test.go +++ b/internal/files/concurrency_test.go @@ -2,7 +2,6 @@ package files_test import ( "crypto/rand" - "fmt" "log" "os" "sync" @@ -10,63 +9,40 @@ import ( "time" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" ) -var items map[string][]byte -var keys map[string]string +var keys [][]byte + +func init() { + for i := 0; i < nItems; i++ { + keys = append(keys, randomData(1)) + } +} const nItems = 10000 -func setupMaps() { - - // Set up two simple maps, one "key" => data and one "indirect key" => - // "key". - - items = make(map[string][]byte, nItems) - keys = make(map[string]string, nItems) - - for i := 0; i < nItems; i++ { - k1 := fmt.Sprintf("key%d", i) - data := make([]byte, 87) - _, err := rand.Reader.Read(data) - if err != nil { - panic(err) - } - items[k1] = data - - k2 := fmt.Sprintf("indirect%d", i) - keys[k2] = k1 +func randomData(prefix byte) []byte { + data := make([]byte, 1+32+64+32) + _, err := rand.Reader.Read(data) + if err != nil { + panic(err) } -} - -func makeK1(s string) []byte { - k1 := make([]byte, 1+len(s)) - k1[0] = 1 - copy(k1[1:], []byte(s)) - return k1 -} - -func makeK2(s string) []byte { - k2 := make([]byte, 1+len(s)) - k2[0] = 2 // Only difference from makeK1 - copy(k2[1:], []byte(s)) - return k2 + return append([]byte{prefix}, data...) } func setItems(db *leveldb.DB) error { - snap, err := db.GetSnapshot() - if err != nil { - return err + batch := new(leveldb.Batch) + for _, k1 := range keys { + k2 := randomData(2) + // k2 -> data + batch.Put(k2, randomData(42)) + // k1 -> k2 + batch.Put(k1, k2) } - defer snap.Release() - - batch := &leveldb.Batch{} - for k2, k1 := range keys { - // Create k1 => item mapping first - batch.Put(makeK1(k1), items[k1]) - // Then the k2 => k1 mapping - batch.Put(makeK2(k2), makeK1(k1)) + if testing.Verbose() { + log.Printf("batch write (set) %p", batch) } return db.Write(batch, nil) } @@ -78,77 +54,100 @@ func clearItems(db *leveldb.DB) error { } defer snap.Release() - // Iterate from the start of k2 space to the end - it := snap.NewIterator(&util.Range{Start: []byte{2}, Limit: []byte{2, 0xff, 0xff, 0xff, 0xff}}, nil) + // Iterate over k2 + + it := snap.NewIterator(util.BytesPrefix([]byte{1}), nil) defer it.Release() - batch := &leveldb.Batch{} + batch := new(leveldb.Batch) for it.Next() { - k2 := it.Key() - k1 := it.Value() + k1 := it.Key() + k2 := it.Value() - // k1 should exist - _, err := snap.Get(k1, nil) + // k2 should exist + _, err := snap.Get(k2, nil) if err != nil { return err } - // Delete the k2 => k1 mapping first - batch.Delete(k2) - // Then the k1 => key mapping + // Delete the k1 => k2 mapping first batch.Delete(k1) + // Then the k2 => data mapping + batch.Delete(k2) + } + if testing.Verbose() { + log.Printf("batch write (clear) %p", batch) } return db.Write(batch, nil) } func scanItems(db *leveldb.DB) error { snap, err := db.GetSnapshot() + if testing.Verbose() { + log.Printf("snap create %p", snap) + } if err != nil { return err } - defer snap.Release() + defer func() { + if testing.Verbose() { + log.Printf("snap release %p", snap) + } + snap.Release() + }() // Iterate from the start of k2 space to the end - it := snap.NewIterator(&util.Range{Start: []byte{2}, Limit: []byte{2, 0xff, 0xff, 0xff, 0xff}}, nil) + it := snap.NewIterator(util.BytesPrefix([]byte{1}), nil) defer it.Release() + i := 0 for it.Next() { // k2 => k1 => data - k2 := it.Key() - k1 := it.Value() - _, err := snap.Get(k1, nil) + k1 := it.Key() + k2 := it.Value() + _, err := snap.Get(k2, nil) if err != nil { - log.Printf("k1: %q (%x)", k1, k1) - log.Printf("k2: %q (%x)", k2, k2) + log.Printf("k1: %x", k1) + log.Printf("k2: %x (missing)", k2) return err } + i++ + } + if testing.Verbose() { + log.Println("scanned", i) } return nil } -func TestConcurrent(t *testing.T) { - setupMaps() +func TestConcurrentSetClear(t *testing.T) { + if testing.Short() { + return + } - dur := 2 * time.Second + dur := 30 * time.Second t0 := time.Now() var wg sync.WaitGroup - os.RemoveAll("testdata/global.db") - db, err := leveldb.OpenFile("testdata/global.db", nil) + os.RemoveAll("testdata/concurrent-set-clear.db") + db, err := leveldb.OpenFile("testdata/concurrent-set-clear.db", &opt.Options{CachedOpenFiles: 10}) if err != nil { t.Fatal(err) } - defer os.RemoveAll("testdata/global.db") + defer os.RemoveAll("testdata/concurrent-set-clear.db") + + errChan := make(chan error, 3) wg.Add(1) go func() { defer wg.Done() for time.Since(t0) < dur { if err := setItems(db); err != nil { - t.Fatal(err) + errChan <- err + return } if err := clearItems(db); err != nil { - t.Fatal(err) + errChan <- err + return } } }() @@ -158,11 +157,71 @@ func TestConcurrent(t *testing.T) { defer wg.Done() for time.Since(t0) < dur { if err := scanItems(db); err != nil { - t.Fatal(err) + errChan <- err + return } } }() - wg.Wait() + go func() { + wg.Wait() + errChan <- nil + }() + + err = <-errChan + if err != nil { + t.Error(err) + } db.Close() } + +func TestConcurrentSetOnly(t *testing.T) { + if testing.Short() { + return + } + + dur := 30 * time.Second + t0 := time.Now() + var wg sync.WaitGroup + + os.RemoveAll("testdata/concurrent-set-only.db") + db, err := leveldb.OpenFile("testdata/concurrent-set-only.db", &opt.Options{CachedOpenFiles: 10}) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll("testdata/concurrent-set-only.db") + + errChan := make(chan error, 3) + + wg.Add(1) + go func() { + defer wg.Done() + for time.Since(t0) < dur { + if err := setItems(db); err != nil { + errChan <- err + return + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for time.Since(t0) < dur { + if err := scanItems(db); err != nil { + errChan <- err + return + } + } + }() + + go func() { + wg.Wait() + errChan <- nil + }() + + err = <-errChan + if err != nil { + t.Error(err) + } +} diff --git a/internal/files/leveldb.go b/internal/files/leveldb.go index c2e9e3186..16d8a1e72 100644 --- a/internal/files/leveldb.go +++ b/internal/files/leveldb.go @@ -174,6 +174,9 @@ func ldbGenericReplace(db *leveldb.DB, folder, device []byte, fs []protocol.File limit := deviceKey(folder, device, []byte{0xff, 0xff, 0xff, 0xff}) // after all folder/device files batch := new(leveldb.Batch) + if debugDB { + l.Debugf("new batch %p", batch) + } snap, err := db.GetSnapshot() if err != nil { panic(err) @@ -335,6 +338,9 @@ func ldbUpdate(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo) ui runtime.GC() batch := new(leveldb.Batch) + if debugDB { + l.Debugf("new batch %p", batch) + } snap, err := db.GetSnapshot() if err != nil { panic(err) @@ -963,7 +969,10 @@ func ldbCheckGlobals(db *leveldb.DB, folder []byte) { dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil) defer dbi.Release() - batch := &leveldb.Batch{} + batch := new(leveldb.Batch) + if debugDB { + l.Debugf("new batch %p", batch) + } for dbi.Next() { gk := dbi.Key() var vl versionList