diff --git a/archiver.go b/archiver.go index 529dd7cdb..23376e656 100644 --- a/archiver.go +++ b/archiver.go @@ -31,7 +31,11 @@ var archiverAllowAllFiles = func(string, os.FileInfo) bool { return true } // Archiver is used to backup a set of directories. type Archiver struct { - repo *repository.Repository + repo *repository.Repository + knownBlobs struct { + backend.IDSet + sync.Mutex + } blobToken chan struct{} @@ -45,6 +49,12 @@ func NewArchiver(repo *repository.Repository) *Archiver { arch := &Archiver{ repo: repo, blobToken: make(chan struct{}, maxConcurrentBlobs), + knownBlobs: struct { + backend.IDSet + sync.Mutex + }{ + IDSet: backend.NewIDSet(), + }, } for i := 0; i < maxConcurrentBlobs; i++ { @@ -57,17 +67,37 @@ func NewArchiver(repo *repository.Repository) *Archiver { return arch } +// isKnownBlob returns true iff the blob is not yet in the list of known blobs. +// When the blob is not known, false is returned and the blob is added to the +// list. This means that the caller false is returned to is responsible to save +// the blob to the backend. +func (arch *Archiver) isKnownBlob(id backend.ID) bool { + arch.knownBlobs.Lock() + defer arch.knownBlobs.Unlock() + + if arch.knownBlobs.Has(id) { + return true + } + + arch.knownBlobs.Insert(id) + + _, err := arch.repo.Index().Lookup(id) + if err == nil { + return true + } + + return false +} + // Save stores a blob read from rd in the repository. func (arch *Archiver) Save(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error { debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str()) - // test if this blob is already known - if arch.repo.Index().Has(id) { - debug.Log("Archiver.Save", "(%v, %v) already saved\n", t, id.Str()) + if arch.isKnownBlob(id) { + debug.Log("Archiver.Save", "blob %v is known\n", id.Str()) return nil } - // otherwise save blob err := arch.repo.SaveFrom(t, &id, length, rd) if err != nil { debug.Log("Archiver.Save", "Save(%v, %v): error %v\n", t, id.Str(), err) @@ -88,7 +118,7 @@ func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) { // check if tree has been saved before id := backend.Hash(data) - if arch.repo.Index().IsInFlight(id) || arch.repo.Index().Has(id) { + if arch.isKnownBlob(id) { return id, nil } diff --git a/archiver_duplication_test.go b/archiver_duplication_test.go new file mode 100644 index 000000000..f3e4c5475 --- /dev/null +++ b/archiver_duplication_test.go @@ -0,0 +1,150 @@ +package restic_test + +import ( + "bytes" + "crypto/rand" + "errors" + "io" + mrand "math/rand" + "sync" + "testing" + "time" + + "github.com/restic/restic" + "github.com/restic/restic/backend" + "github.com/restic/restic/pack" + "github.com/restic/restic/repository" +) + +const parallelSaves = 50 +const testSaveIndexTime = 100 * time.Millisecond +const testTimeout = 2 * time.Second + +var DupID backend.ID + +func randomID() backend.ID { + if mrand.Float32() < 0.5 { + return DupID + } + + id := backend.ID{} + _, err := io.ReadFull(rand.Reader, id[:]) + if err != nil { + panic(err) + } + return id +} + +// forgetfulBackend returns a backend that forgets everything. +func forgetfulBackend() backend.Backend { + be := &backend.MockBackend{} + + be.TestFn = func(t backend.Type, name string) (bool, error) { + return false, nil + } + + be.LoadFn = func(h backend.Handle, p []byte, off int64) (int, error) { + return 0, errors.New("not found") + } + + be.SaveFn = func(h backend.Handle, p []byte) error { + return nil + } + + be.StatFn = func(h backend.Handle) (backend.BlobInfo, error) { + return backend.BlobInfo{}, errors.New("not found") + } + + be.RemoveFn = func(t backend.Type, name string) error { + return nil + } + + be.ListFn = func(t backend.Type, done <-chan struct{}) <-chan string { + ch := make(chan string) + close(ch) + return ch + } + + be.DeleteFn = func() error { + return nil + } + + return be +} + +func testArchiverDuplication(t *testing.T) { + _, err := io.ReadFull(rand.Reader, DupID[:]) + if err != nil { + t.Fatal(err) + } + + repo := repository.New(forgetfulBackend()) + err = repo.Init("foo") + if err != nil { + t.Fatal(err) + } + + arch := restic.NewArchiver(repo) + + wg := &sync.WaitGroup{} + done := make(chan struct{}) + for i := 0; i < parallelSaves; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + } + + id := randomID() + + if repo.Index().Has(id) { + continue + } + + buf := make([]byte, 50) + + err := arch.Save(pack.Data, id, uint(len(buf)), bytes.NewReader(buf)) + if err != nil { + t.Fatal(err) + } + } + }() + } + + saveIndex := func() { + defer wg.Done() + + ticker := time.NewTicker(testSaveIndexTime) + defer ticker.Stop() + + for { + select { + case <-done: + return + case <-ticker.C: + err := repo.SaveFullIndex() + if err != nil { + t.Fatal(err) + } + } + } + } + + wg.Add(1) + go saveIndex() + + <-time.After(testTimeout) + close(done) + + wg.Wait() +} + +func TestArchiverDuplication(t *testing.T) { + for i := 0; i < 5; i++ { + testArchiverDuplication(t) + } +} diff --git a/checker/repacker.go b/checker/repacker.go index e56522714..f3b158d0c 100644 --- a/checker/repacker.go +++ b/checker/repacker.go @@ -136,7 +136,7 @@ func repackBlob(src, dst *repository.Repository, id backend.ID) error { return errors.New("LoadBlob returned wrong data, len() doesn't match") } - _, err = dst.SaveAndEncrypt(blob.Type, buf, &id, true) + _, err = dst.SaveAndEncrypt(blob.Type, buf, &id) if err != nil { return err } diff --git a/repository/master_index.go b/repository/master_index.go index 37d3729a8..432754c53 100644 --- a/repository/master_index.go +++ b/repository/master_index.go @@ -13,23 +13,11 @@ import ( type MasterIndex struct { idx []*Index idxMutex sync.RWMutex - - inFlight struct { - backend.IDSet - sync.RWMutex - } } // NewMasterIndex creates a new master index. func NewMasterIndex() *MasterIndex { - return &MasterIndex{ - inFlight: struct { - backend.IDSet - sync.RWMutex - }{ - IDSet: backend.NewIDSet(), - }, - } + return &MasterIndex{} } // Lookup queries all known Indexes for the ID and returns the first match. @@ -154,68 +142,6 @@ func (mi *MasterIndex) Current() *Index { return newIdx } -// AddInFlight add the given ID to the list of in-flight IDs. An error is -// returned when the ID is already in the list. Setting ignoreDuplicates to -// true only checks the in flight list, otherwise the index itself is also -// tested. -func (mi *MasterIndex) AddInFlight(id backend.ID, ignoreDuplicates bool) error { - // The index + inFlight store must be searched for a matching id in one - // atomic operation. This requires locking the inFlight store and the - // index together! - mi.inFlight.Lock() - defer mi.inFlight.Unlock() - - if !ignoreDuplicates { - // Note: mi.Has read locks the index again. - mi.idxMutex.RLock() - defer mi.idxMutex.RUnlock() - } - - debug.Log("MasterIndex.AddInFlight", "adding %v", id.Str()) - if mi.inFlight.Has(id) { - return fmt.Errorf("%v is already in flight", id.Str()) - } - - if !ignoreDuplicates { - if mi.Has(id) { - return fmt.Errorf("%v is already indexed (fully processed)", id) - } - } - - mi.inFlight.Insert(id) - return nil -} - -// IsInFlight returns true iff the id is contained in the list of in-flight IDs. -func (mi *MasterIndex) IsInFlight(id backend.ID) bool { - // The index + inFlight store must be searched for a matching id in one - // atomic operation. This requires locking the inFlight store and the - // index together! - mi.inFlight.RLock() - defer mi.inFlight.RUnlock() - - mi.idxMutex.RLock() - defer mi.idxMutex.RUnlock() - - inFlight := mi.inFlight.Has(id) - debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight) - - indexed := mi.Has(id) - debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed) - - return inFlight -} - -// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs. -func (mi *MasterIndex) RemoveFromInFlight(id backend.ID) { - mi.inFlight.Lock() - defer mi.inFlight.Unlock() - - debug.Log("MasterIndex.RemoveFromInFlight", "removing %v from list of in flight blobs", id.Str()) - - mi.inFlight.Delete(id) -} - // NotFinalIndexes returns all indexes that have not yet been saved. func (mi *MasterIndex) NotFinalIndexes() []*Index { mi.idxMutex.Lock() diff --git a/repository/packer_manager.go b/repository/packer_manager.go index 42ffe96cb..4e9ea4bc0 100644 --- a/repository/packer_manager.go +++ b/repository/packer_manager.go @@ -84,7 +84,6 @@ func (r *Repository) savePacker(p *pack.Packer) error { Offset: b.Offset, Length: uint(b.Length), }) - r.idx.RemoveFromInFlight(b.ID) } return nil diff --git a/repository/repository.go b/repository/repository.go index 88f289338..27fe5d9c6 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -167,10 +167,9 @@ func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) { return r.idx.LookupSize(id) } -// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data is small -// enough, it will be packed together with other small blobs. When -// ignoreDuplicates is true, blobs already in the index will be saved again. -func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID, ignoreDuplicates bool) (backend.ID, error) { +// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data +// is small enough, it will be packed together with other small blobs. +func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID) (backend.ID, error) { if id == nil { // compute plaintext hash hashedID := backend.Hash(data) @@ -189,18 +188,9 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID return backend.ID{}, err } - // add this id to the list of in-flight chunk ids. - debug.Log("Repo.Save", "add %v to list of in-flight IDs", id.Str()) - err = r.idx.AddInFlight(*id, ignoreDuplicates) - if err != nil { - debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t) - return *id, nil - } - // find suitable packer and add blob packer, err := r.findPacker(uint(len(ciphertext))) if err != nil { - r.idx.RemoveFromInFlight(*id) return backend.ID{}, err } @@ -234,7 +224,7 @@ func (r *Repository) SaveFrom(t pack.BlobType, id *backend.ID, length uint, rd i return err } - _, err = r.SaveAndEncrypt(t, buf, id, false) + _, err = r.SaveAndEncrypt(t, buf, id) if err != nil { return err } @@ -258,7 +248,7 @@ func (r *Repository) SaveJSON(t pack.BlobType, item interface{}) (backend.ID, er } buf = wr.Bytes() - return r.SaveAndEncrypt(t, buf, nil, false) + return r.SaveAndEncrypt(t, buf, nil) } // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the diff --git a/repository/repository_test.go b/repository/repository_test.go index 4e3659af2..81378742b 100644 --- a/repository/repository_test.go +++ b/repository/repository_test.go @@ -83,7 +83,7 @@ func TestSave(t *testing.T) { id := backend.Hash(data) // save - sid, err := repo.SaveAndEncrypt(pack.Data, data, nil, false) + sid, err := repo.SaveAndEncrypt(pack.Data, data, nil) OK(t, err) Equals(t, id, sid) @@ -253,7 +253,7 @@ func saveRandomDataBlobs(t testing.TB, repo *repository.Repository, num int, siz _, err := io.ReadFull(rand.Reader, buf) OK(t, err) - _, err = repo.SaveAndEncrypt(pack.Data, buf, nil, false) + _, err = repo.SaveAndEncrypt(pack.Data, buf, nil) OK(t, err) } }