diff --git a/archiver.go b/archiver.go index 529dd7cdb..23376e656 100644 --- a/archiver.go +++ b/archiver.go @@ -31,7 +31,11 @@ var archiverAllowAllFiles = func(string, os.FileInfo) bool { return true } // Archiver is used to backup a set of directories. type Archiver struct { - repo *repository.Repository + repo *repository.Repository + knownBlobs struct { + backend.IDSet + sync.Mutex + } blobToken chan struct{} @@ -45,6 +49,12 @@ func NewArchiver(repo *repository.Repository) *Archiver { arch := &Archiver{ repo: repo, blobToken: make(chan struct{}, maxConcurrentBlobs), + knownBlobs: struct { + backend.IDSet + sync.Mutex + }{ + IDSet: backend.NewIDSet(), + }, } for i := 0; i < maxConcurrentBlobs; i++ { @@ -57,17 +67,37 @@ func NewArchiver(repo *repository.Repository) *Archiver { return arch } +// isKnownBlob returns true iff the blob is not yet in the list of known blobs. +// When the blob is not known, false is returned and the blob is added to the +// list. This means that the caller false is returned to is responsible to save +// the blob to the backend. +func (arch *Archiver) isKnownBlob(id backend.ID) bool { + arch.knownBlobs.Lock() + defer arch.knownBlobs.Unlock() + + if arch.knownBlobs.Has(id) { + return true + } + + arch.knownBlobs.Insert(id) + + _, err := arch.repo.Index().Lookup(id) + if err == nil { + return true + } + + return false +} + // Save stores a blob read from rd in the repository. func (arch *Archiver) Save(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error { debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str()) - // test if this blob is already known - if arch.repo.Index().Has(id) { - debug.Log("Archiver.Save", "(%v, %v) already saved\n", t, id.Str()) + if arch.isKnownBlob(id) { + debug.Log("Archiver.Save", "blob %v is known\n", id.Str()) return nil } - // otherwise save blob err := arch.repo.SaveFrom(t, &id, length, rd) if err != nil { debug.Log("Archiver.Save", "Save(%v, %v): error %v\n", t, id.Str(), err) @@ -88,7 +118,7 @@ func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) { // check if tree has been saved before id := backend.Hash(data) - if arch.repo.Index().IsInFlight(id) || arch.repo.Index().Has(id) { + if arch.isKnownBlob(id) { return id, nil } diff --git a/checker/repacker.go b/checker/repacker.go index e56522714..f3b158d0c 100644 --- a/checker/repacker.go +++ b/checker/repacker.go @@ -136,7 +136,7 @@ func repackBlob(src, dst *repository.Repository, id backend.ID) error { return errors.New("LoadBlob returned wrong data, len() doesn't match") } - _, err = dst.SaveAndEncrypt(blob.Type, buf, &id, true) + _, err = dst.SaveAndEncrypt(blob.Type, buf, &id) if err != nil { return err } diff --git a/repository/master_index.go b/repository/master_index.go index 37d3729a8..432754c53 100644 --- a/repository/master_index.go +++ b/repository/master_index.go @@ -13,23 +13,11 @@ import ( type MasterIndex struct { idx []*Index idxMutex sync.RWMutex - - inFlight struct { - backend.IDSet - sync.RWMutex - } } // NewMasterIndex creates a new master index. func NewMasterIndex() *MasterIndex { - return &MasterIndex{ - inFlight: struct { - backend.IDSet - sync.RWMutex - }{ - IDSet: backend.NewIDSet(), - }, - } + return &MasterIndex{} } // Lookup queries all known Indexes for the ID and returns the first match. @@ -154,68 +142,6 @@ func (mi *MasterIndex) Current() *Index { return newIdx } -// AddInFlight add the given ID to the list of in-flight IDs. An error is -// returned when the ID is already in the list. Setting ignoreDuplicates to -// true only checks the in flight list, otherwise the index itself is also -// tested. -func (mi *MasterIndex) AddInFlight(id backend.ID, ignoreDuplicates bool) error { - // The index + inFlight store must be searched for a matching id in one - // atomic operation. This requires locking the inFlight store and the - // index together! - mi.inFlight.Lock() - defer mi.inFlight.Unlock() - - if !ignoreDuplicates { - // Note: mi.Has read locks the index again. - mi.idxMutex.RLock() - defer mi.idxMutex.RUnlock() - } - - debug.Log("MasterIndex.AddInFlight", "adding %v", id.Str()) - if mi.inFlight.Has(id) { - return fmt.Errorf("%v is already in flight", id.Str()) - } - - if !ignoreDuplicates { - if mi.Has(id) { - return fmt.Errorf("%v is already indexed (fully processed)", id) - } - } - - mi.inFlight.Insert(id) - return nil -} - -// IsInFlight returns true iff the id is contained in the list of in-flight IDs. -func (mi *MasterIndex) IsInFlight(id backend.ID) bool { - // The index + inFlight store must be searched for a matching id in one - // atomic operation. This requires locking the inFlight store and the - // index together! - mi.inFlight.RLock() - defer mi.inFlight.RUnlock() - - mi.idxMutex.RLock() - defer mi.idxMutex.RUnlock() - - inFlight := mi.inFlight.Has(id) - debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight) - - indexed := mi.Has(id) - debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed) - - return inFlight -} - -// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs. -func (mi *MasterIndex) RemoveFromInFlight(id backend.ID) { - mi.inFlight.Lock() - defer mi.inFlight.Unlock() - - debug.Log("MasterIndex.RemoveFromInFlight", "removing %v from list of in flight blobs", id.Str()) - - mi.inFlight.Delete(id) -} - // NotFinalIndexes returns all indexes that have not yet been saved. func (mi *MasterIndex) NotFinalIndexes() []*Index { mi.idxMutex.Lock() diff --git a/repository/packer_manager.go b/repository/packer_manager.go index 42ffe96cb..4e9ea4bc0 100644 --- a/repository/packer_manager.go +++ b/repository/packer_manager.go @@ -84,7 +84,6 @@ func (r *Repository) savePacker(p *pack.Packer) error { Offset: b.Offset, Length: uint(b.Length), }) - r.idx.RemoveFromInFlight(b.ID) } return nil diff --git a/repository/repository.go b/repository/repository.go index 88f289338..27fe5d9c6 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -167,10 +167,9 @@ func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) { return r.idx.LookupSize(id) } -// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data is small -// enough, it will be packed together with other small blobs. When -// ignoreDuplicates is true, blobs already in the index will be saved again. -func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID, ignoreDuplicates bool) (backend.ID, error) { +// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data +// is small enough, it will be packed together with other small blobs. +func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID) (backend.ID, error) { if id == nil { // compute plaintext hash hashedID := backend.Hash(data) @@ -189,18 +188,9 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID return backend.ID{}, err } - // add this id to the list of in-flight chunk ids. - debug.Log("Repo.Save", "add %v to list of in-flight IDs", id.Str()) - err = r.idx.AddInFlight(*id, ignoreDuplicates) - if err != nil { - debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t) - return *id, nil - } - // find suitable packer and add blob packer, err := r.findPacker(uint(len(ciphertext))) if err != nil { - r.idx.RemoveFromInFlight(*id) return backend.ID{}, err } @@ -234,7 +224,7 @@ func (r *Repository) SaveFrom(t pack.BlobType, id *backend.ID, length uint, rd i return err } - _, err = r.SaveAndEncrypt(t, buf, id, false) + _, err = r.SaveAndEncrypt(t, buf, id) if err != nil { return err } @@ -258,7 +248,7 @@ func (r *Repository) SaveJSON(t pack.BlobType, item interface{}) (backend.ID, er } buf = wr.Bytes() - return r.SaveAndEncrypt(t, buf, nil, false) + return r.SaveAndEncrypt(t, buf, nil) } // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the diff --git a/repository/repository_test.go b/repository/repository_test.go index 4e3659af2..81378742b 100644 --- a/repository/repository_test.go +++ b/repository/repository_test.go @@ -83,7 +83,7 @@ func TestSave(t *testing.T) { id := backend.Hash(data) // save - sid, err := repo.SaveAndEncrypt(pack.Data, data, nil, false) + sid, err := repo.SaveAndEncrypt(pack.Data, data, nil) OK(t, err) Equals(t, id, sid) @@ -253,7 +253,7 @@ func saveRandomDataBlobs(t testing.TB, repo *repository.Repository, num int, siz _, err := io.ReadFull(rand.Reader, buf) OK(t, err) - _, err = repo.SaveAndEncrypt(pack.Data, buf, nil, false) + _, err = repo.SaveAndEncrypt(pack.Data, buf, nil) OK(t, err) } }