From f53008d916dac399bfbfa9dd93b5181a8fe7de81 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 17 Jan 2016 21:14:55 +0100 Subject: [PATCH 1/2] Allow saving duplicate blobs in the repacker This adds code to the master index to allow saving duplicate blobs within the repacker. In this mode, only the list of currently in flight blobs is consulted, and not the index. This correct because while repacking, a unique list of blobs is saved again to the index. --- checker/repacker.go | 2 +- repository/master_index.go | 70 +++++++++++++++++++---------------- repository/repository.go | 11 +++--- repository/repository_test.go | 4 +- 4 files changed, 47 insertions(+), 40 deletions(-) diff --git a/checker/repacker.go b/checker/repacker.go index f3b158d0c..e56522714 100644 --- a/checker/repacker.go +++ b/checker/repacker.go @@ -136,7 +136,7 @@ func repackBlob(src, dst *repository.Repository, id backend.ID) error { return errors.New("LoadBlob returned wrong data, len() doesn't match") } - _, err = dst.SaveAndEncrypt(blob.Type, buf, &id) + _, err = dst.SaveAndEncrypt(blob.Type, buf, &id, true) if err != nil { return err } diff --git a/repository/master_index.go b/repository/master_index.go index 2f63e82a7..37d3729a8 100644 --- a/repository/master_index.go +++ b/repository/master_index.go @@ -155,49 +155,55 @@ func (mi *MasterIndex) Current() *Index { } // AddInFlight add the given ID to the list of in-flight IDs. An error is -// returned when the ID is already in the list. -func (mi *MasterIndex) AddInFlight(id backend.ID) error { - // The index + inFlight store must be searched for a matching id in one - // atomic operation. This requires locking the inFlight store and the - // index together! - mi.inFlight.Lock() - defer mi.inFlight.Unlock() +// returned when the ID is already in the list. Setting ignoreDuplicates to +// true only checks the in flight list, otherwise the index itself is also +// tested. +func (mi *MasterIndex) AddInFlight(id backend.ID, ignoreDuplicates bool) error { + // The index + inFlight store must be searched for a matching id in one + // atomic operation. This requires locking the inFlight store and the + // index together! + mi.inFlight.Lock() + defer mi.inFlight.Unlock() - // Note: mi.Has read locks the index again. - mi.idxMutex.RLock() - defer mi.idxMutex.RUnlock() + if !ignoreDuplicates { + // Note: mi.Has read locks the index again. + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() + } - debug.Log("MasterIndex.AddInFlight", "adding %v", id) - if mi.inFlight.Has(id) { - return fmt.Errorf("%v is already in flight", id) - } - if mi.Has(id) { - return fmt.Errorf("%v is already indexed (fully processed)", id) - } + debug.Log("MasterIndex.AddInFlight", "adding %v", id.Str()) + if mi.inFlight.Has(id) { + return fmt.Errorf("%v is already in flight", id.Str()) + } - mi.inFlight.Insert(id) - return nil + if !ignoreDuplicates { + if mi.Has(id) { + return fmt.Errorf("%v is already indexed (fully processed)", id) + } + } + + mi.inFlight.Insert(id) + return nil } // IsInFlight returns true iff the id is contained in the list of in-flight IDs. func (mi *MasterIndex) IsInFlight(id backend.ID) bool { - // The index + inFlight store must be searched for a matching id in one - // atomic operation. This requires locking the inFlight store and the - // index together! - mi.inFlight.RLock() - defer mi.inFlight.RUnlock() + // The index + inFlight store must be searched for a matching id in one + // atomic operation. This requires locking the inFlight store and the + // index together! + mi.inFlight.RLock() + defer mi.inFlight.RUnlock() - // Note: mi.Has read locks the index again. - mi.idxMutex.RLock() - defer mi.idxMutex.RUnlock() + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() - inFlight := mi.inFlight.Has(id) - debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight) + inFlight := mi.inFlight.Has(id) + debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight) - indexed := mi.Has(id) - debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed) + indexed := mi.Has(id) + debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed) - return inFlight + return inFlight } // RemoveFromInFlight deletes the given ID from the liste of in-flight IDs. diff --git a/repository/repository.go b/repository/repository.go index 0b1955486..bc5e380ac 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -219,8 +219,9 @@ func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) { } // SaveAndEncrypt encrypts data and stores it to the backend as type t. If data is small -// enough, it will be packed together with other small blobs. -func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID) (backend.ID, error) { +// enough, it will be packed together with other small blobs. When +// ignoreDuplicates is true, blobs already in the index will be saved again. +func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID, ignoreDuplicates bool) (backend.ID, error) { if id == nil { // compute plaintext hash hashedID := backend.Hash(data) @@ -241,7 +242,7 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID // add this id to the list of in-flight chunk ids. debug.Log("Repo.Save", "add %v to list of in-flight IDs", id.Str()) - err = r.idx.AddInFlight(*id) + err = r.idx.AddInFlight(*id, ignoreDuplicates) if err != nil { debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t) return *id, nil @@ -284,7 +285,7 @@ func (r *Repository) SaveFrom(t pack.BlobType, id *backend.ID, length uint, rd i return err } - _, err = r.SaveAndEncrypt(t, buf, id) + _, err = r.SaveAndEncrypt(t, buf, id, false) if err != nil { return err } @@ -308,7 +309,7 @@ func (r *Repository) SaveJSON(t pack.BlobType, item interface{}) (backend.ID, er } buf = wr.Bytes() - return r.SaveAndEncrypt(t, buf, nil) + return r.SaveAndEncrypt(t, buf, nil, false) } // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the diff --git a/repository/repository_test.go b/repository/repository_test.go index 81378742b..4e3659af2 100644 --- a/repository/repository_test.go +++ b/repository/repository_test.go @@ -83,7 +83,7 @@ func TestSave(t *testing.T) { id := backend.Hash(data) // save - sid, err := repo.SaveAndEncrypt(pack.Data, data, nil) + sid, err := repo.SaveAndEncrypt(pack.Data, data, nil, false) OK(t, err) Equals(t, id, sid) @@ -253,7 +253,7 @@ func saveRandomDataBlobs(t testing.TB, repo *repository.Repository, num int, siz _, err := io.ReadFull(rand.Reader, buf) OK(t, err) - _, err = repo.SaveAndEncrypt(pack.Data, buf, nil) + _, err = repo.SaveAndEncrypt(pack.Data, buf, nil, false) OK(t, err) } } From 109a120b397ac16266fe47f7ef8819054ebf8483 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 17 Jan 2016 21:27:51 +0100 Subject: [PATCH 2/2] Fix RandomReader --- crypto/crypto_test.go | 6 ++---- test/helpers.go | 1 - 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/crypto/crypto_test.go b/crypto/crypto_test.go index 50895c65c..a821e105f 100644 --- a/crypto/crypto_test.go +++ b/crypto/crypto_test.go @@ -133,7 +133,6 @@ func TestLargeEncrypt(t *testing.T) { func BenchmarkEncryptWriter(b *testing.B) { size := 8 << 20 // 8MiB - rd := RandomReader(23, size) k := crypto.NewRandomKey() @@ -141,7 +140,7 @@ func BenchmarkEncryptWriter(b *testing.B) { b.SetBytes(int64(size)) for i := 0; i < b.N; i++ { - rd.Seek(0, 0) + rd := RandomReader(23, size) wr := crypto.EncryptTo(k, ioutil.Discard) n, err := io.Copy(wr, rd) OK(b, err) @@ -195,14 +194,13 @@ func BenchmarkEncryptDecryptReader(b *testing.B) { k := crypto.NewRandomKey() size := 8 << 20 // 8MiB - rd := RandomReader(23, size) b.ResetTimer() b.SetBytes(int64(size)) buf := bytes.NewBuffer(nil) for i := 0; i < b.N; i++ { - rd.Seek(0, 0) + rd := RandomReader(23, size) buf.Reset() wr := crypto.EncryptTo(k, buf) _, err := io.Copy(wr, rd) diff --git a/test/helpers.go b/test/helpers.go index 69a75cd2c..bfab04cc9 100644 --- a/test/helpers.go +++ b/test/helpers.go @@ -90,7 +90,6 @@ type rndReader struct { } func (r *rndReader) Read(p []byte) (int, error) { - fmt.Printf("Read(%v)\n", len(p)) for i := range p { p[i] = byte(r.src.Uint32()) }