From 6a548336ec799a6e080b63d406e402038c6db16a Mon Sep 17 00:00:00 2001 From: Philipp Serr Date: Sun, 6 Dec 2015 17:35:22 +0100 Subject: [PATCH] Add a test concurrently saving duplicated chunks This commit adds an integration test, that calls Archiver.Save from many goroutines processing several duplicated chunks concurrently. The test asserts, that after all chunks have been saved, there are no unreferenced packs in the repository. The test has been checked to give the expected results: 1) Running the test with maxParallel=1 (all chunks are processed sequentially) has been verified not to produce any unreferenced packs. Consequently the test passes. 2) Running the test with unbounded parallelism (maxParallel= math.MaxInt32) has been verified to produce unreferenced packs all the time (at least 25 test runs). Consequently the test fails due to #358. references: #358 --- archiver_test.go | 120 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/archiver_test.go b/archiver_test.go index 007ed4115..63531b0b4 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -4,13 +4,16 @@ import ( "bytes" "crypto/sha256" "io" + "math" "testing" "github.com/restic/chunker" "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/checker" "github.com/restic/restic/crypto" "github.com/restic/restic/pack" + "github.com/restic/restic/repository" . "github.com/restic/restic/test" ) @@ -21,6 +24,11 @@ type Rdr interface { io.ReaderAt } +type chunkedData struct { + buf []byte + chunks []*chunker.Chunk +} + func benchmarkChunkEncrypt(b testing.TB, buf, buf2 []byte, rd Rdr, key *crypto.Key) { rd.Seek(0, 0) ch := chunker.New(rd, testPol, sha256.New()) @@ -233,3 +241,115 @@ func BenchmarkLoadTree(t *testing.B) { } } } + +// Saves several identical chunks concurrently and later check that there are no +// unreferenced packs in the repository. See also #292 and #358. +// The combination of high duplication and high concurrency should provoke any +// issues leading to unreferenced packs. +func TestParallelSaveWithHighDuplication(t *testing.T) { + repo := SetupRepo() + defer TeardownRepo(repo) + + // For every seed a pseudo-random 32Mb blob is generated and split into + // chunks. During the test all chunks of all blobs are processed in parallel + // goroutines. To increase duplication, each chunk is processed + // times. Concurrency can be limited by changing . + // Note: seeds 5, 3, 66, 4, 12 produce the most chunks (descending) + seeds := []int{5, 3, 66, 4, 12} + maxParallel := math.MaxInt32 + duplication := 15 + + arch := restic.NewArchiver(repo) + data := getRandomData(seeds) + + barrier := make(chan struct{}, maxParallel) + errChannels := [](<-chan error){} + + for _, d := range data { + for _, c := range d.chunks { + for dupIdx := 0; dupIdx < duplication; dupIdx++ { + errChan := make(chan error) + errChannels = append(errChannels, errChan) + + go func(buf *[]byte, c *chunker.Chunk, errChan chan<- error) { + barrier <- struct{}{} + + hash := c.Digest + id := backend.ID{} + copy(id[:], hash) + + err := arch.Save(pack.Data, id, c.Length, c.Reader(bytes.NewReader(*buf))) + <-barrier + errChan <- err + }(&d.buf, c, errChan) + } + } + } + + for _, errChan := range errChannels { + OK(t, <-errChan) + } + + OK(t, repo.Flush()) + OK(t, repo.SaveIndex()) + + chkr := createAndInitChecker(t, repo) + assertNoUnreferencedPacks(t, chkr) +} + +func getRandomData(seeds []int) []*chunkedData { + chunks := []*chunkedData{} + sem := make(chan struct{}, len(seeds)) + + for seed := range seeds { + c := &chunkedData{} + chunks = append(chunks, c) + + go func(seed int, data *chunkedData) { + data.buf = Random(seed, 32*1024*1024) + chunker := chunker.New(bytes.NewReader(data.buf), testPol, sha256.New()) + + for { + c, err := chunker.Next() + if err == io.EOF { + break + } + data.chunks = append(data.chunks, c) + } + + sem <- struct{}{} + }(seed, c) + } + + for i := 0; i < len(seeds); i++ { + <-sem + } + return chunks +} + +func createAndInitChecker(t *testing.T, repo *repository.Repository) *checker.Checker { + chkr := checker.New(repo) + + hints, errs := chkr.LoadIndex() + if len(errs) > 0 { + t.Fatalf("expected no errors, got %v: %v", len(errs), errs) + } + + if len(hints) > 0 { + t.Errorf("expected no hints, got %v: %v", len(hints), hints) + } + + return chkr +} + +func assertNoUnreferencedPacks(t *testing.T, chkr *checker.Checker) { + done := make(chan struct{}) + defer close(done) + + errChan := make(chan error) + go chkr.Packs(errChan, done) + + for err := range errChan { + OK(t, err) + } +}