diff --git a/archiver_test.go b/archiver_test.go index 007ed4115..63531b0b4 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -4,13 +4,16 @@ import ( "bytes" "crypto/sha256" "io" + "math" "testing" "github.com/restic/chunker" "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/checker" "github.com/restic/restic/crypto" "github.com/restic/restic/pack" + "github.com/restic/restic/repository" . "github.com/restic/restic/test" ) @@ -21,6 +24,11 @@ type Rdr interface { io.ReaderAt } +type chunkedData struct { + buf []byte + chunks []*chunker.Chunk +} + func benchmarkChunkEncrypt(b testing.TB, buf, buf2 []byte, rd Rdr, key *crypto.Key) { rd.Seek(0, 0) ch := chunker.New(rd, testPol, sha256.New()) @@ -233,3 +241,115 @@ func BenchmarkLoadTree(t *testing.B) { } } } + +// Saves several identical chunks concurrently and later check that there are no +// unreferenced packs in the repository. See also #292 and #358. +// The combination of high duplication and high concurrency should provoke any +// issues leading to unreferenced packs. +func TestParallelSaveWithHighDuplication(t *testing.T) { + repo := SetupRepo() + defer TeardownRepo(repo) + + // For every seed a pseudo-random 32Mb blob is generated and split into + // chunks. During the test all chunks of all blobs are processed in parallel + // goroutines. To increase duplication, each chunk is processed + // times. Concurrency can be limited by changing . + // Note: seeds 5, 3, 66, 4, 12 produce the most chunks (descending) + seeds := []int{5, 3, 66, 4, 12} + maxParallel := math.MaxInt32 + duplication := 15 + + arch := restic.NewArchiver(repo) + data := getRandomData(seeds) + + barrier := make(chan struct{}, maxParallel) + errChannels := [](<-chan error){} + + for _, d := range data { + for _, c := range d.chunks { + for dupIdx := 0; dupIdx < duplication; dupIdx++ { + errChan := make(chan error) + errChannels = append(errChannels, errChan) + + go func(buf *[]byte, c *chunker.Chunk, errChan chan<- error) { + barrier <- struct{}{} + + hash := c.Digest + id := backend.ID{} + copy(id[:], hash) + + err := arch.Save(pack.Data, id, c.Length, c.Reader(bytes.NewReader(*buf))) + <-barrier + errChan <- err + }(&d.buf, c, errChan) + } + } + } + + for _, errChan := range errChannels { + OK(t, <-errChan) + } + + OK(t, repo.Flush()) + OK(t, repo.SaveIndex()) + + chkr := createAndInitChecker(t, repo) + assertNoUnreferencedPacks(t, chkr) +} + +func getRandomData(seeds []int) []*chunkedData { + chunks := []*chunkedData{} + sem := make(chan struct{}, len(seeds)) + + for seed := range seeds { + c := &chunkedData{} + chunks = append(chunks, c) + + go func(seed int, data *chunkedData) { + data.buf = Random(seed, 32*1024*1024) + chunker := chunker.New(bytes.NewReader(data.buf), testPol, sha256.New()) + + for { + c, err := chunker.Next() + if err == io.EOF { + break + } + data.chunks = append(data.chunks, c) + } + + sem <- struct{}{} + }(seed, c) + } + + for i := 0; i < len(seeds); i++ { + <-sem + } + return chunks +} + +func createAndInitChecker(t *testing.T, repo *repository.Repository) *checker.Checker { + chkr := checker.New(repo) + + hints, errs := chkr.LoadIndex() + if len(errs) > 0 { + t.Fatalf("expected no errors, got %v: %v", len(errs), errs) + } + + if len(hints) > 0 { + t.Errorf("expected no hints, got %v: %v", len(hints), hints) + } + + return chkr +} + +func assertNoUnreferencedPacks(t *testing.T, chkr *checker.Checker) { + done := make(chan struct{}) + defer close(done) + + errChan := make(chan error) + go chkr.Packs(errChan, done) + + for err := range errChan { + OK(t, err) + } +}