2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-29 16:23:59 +00:00

Merge pull request #2773 from aawsome/index-uploads+knownblobs

Fix non-intuitive repo behavior
This commit is contained in:
MichaelEischer 2020-06-12 22:41:04 +02:00 committed by GitHub
commit 735a8074d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 180 additions and 253 deletions

View File

@ -0,0 +1,6 @@
Enhancement: Optimize handling of new index entries
Restic now uses less memory for backups which add a lot of data, e.g. large initial backups.
In addition, we've improved the stability in some edge cases.
https://github.com/restic/restic/pull/2773

View File

@ -575,24 +575,6 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
ParentSnapshot: *parentSnapshotID, ParentSnapshot: *parentSnapshotID,
} }
uploader := archiver.IndexUploader{
Repository: repo,
Start: func() {
if !gopts.JSON {
p.VV("uploading intermediate index")
}
},
Complete: func(id restic.ID) {
if !gopts.JSON {
p.V("uploaded intermediate index %v", id.Str())
}
},
}
t.Go(func() error {
return uploader.Upload(gopts.ctx, t.Context(gopts.ctx), 30*time.Second)
})
if !gopts.JSON { if !gopts.JSON {
p.V("start backup on %v", targets) p.V("start backup on %v", targets)
} }

View File

@ -87,6 +87,9 @@ func runPrune(gopts GlobalOptions) error {
return err return err
} }
// we do not need index updates while pruning!
repo.DisableAutoIndexUpdate()
return pruneRepository(gopts, repo) return pruneRepository(gopts, repo)
} }

View File

@ -130,11 +130,6 @@ func runRecover(gopts GlobalOptions) error {
return errors.Fatalf("unable to save blobs to the repo: %v", err) return errors.Fatalf("unable to save blobs to the repo: %v", err)
} }
err = repo.SaveIndex(gopts.ctx)
if err != nil {
return errors.Fatalf("unable to save new index to the repo: %v", err)
}
sn, err := restic.NewSnapshot([]string{"/recover"}, []string{}, hostname, time.Now()) sn, err := restic.NewSnapshot([]string{"/recover"}, []string{}, hostname, time.Now())
if err != nil { if err != nil {
return errors.Fatalf("unable to save snapshot: %v", err) return errors.Fatalf("unable to save snapshot: %v", err)

View File

@ -783,11 +783,6 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
return nil, restic.ID{}, err return nil, restic.ID{}, err
} }
err = arch.Repo.SaveIndex(ctx)
if err != nil {
return nil, restic.ID{}, err
}
sn, err := restic.NewSnapshot(targets, opts.Tags, opts.Hostname, opts.Time) sn, err := restic.NewSnapshot(targets, opts.Tags, opts.Hostname, opts.Time)
if err != nil { if err != nil {
return nil, restic.ID{}, err return nil, restic.ID{}, err

View File

@ -96,11 +96,6 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
t.Fatal(err) t.Fatal(err)
} }
err = repo.SaveIndex(ctx)
if err != nil {
t.Fatal(err)
}
if !startCallback { if !startCallback {
t.Errorf("start callback did not happen") t.Errorf("start callback did not happen")
} }
@ -418,13 +413,16 @@ type blobCountingRepo struct {
saved map[restic.BlobHandle]uint saved map[restic.BlobHandle]uint
} }
func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, error) {
id, err := repo.Repository.SaveBlob(ctx, t, buf, id) id, exists, err := repo.Repository.SaveBlob(ctx, t, buf, id, false)
if exists {
return id, exists, err
}
h := restic.BlobHandle{ID: id, Type: t} h := restic.BlobHandle{ID: id, Type: t}
repo.m.Lock() repo.m.Lock()
repo.saved[h]++ repo.saved[h]++
repo.m.Unlock() repo.m.Unlock()
return id, err return id, exists, err
} }
func (repo *blobCountingRepo) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, error) { func (repo *blobCountingRepo) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, error) {
@ -853,11 +851,6 @@ func TestArchiverSaveDir(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = repo.SaveIndex(ctx)
if err != nil {
t.Fatal(err)
}
want := test.want want := test.want
if want == nil { if want == nil {
want = test.src want = test.src
@ -946,11 +939,6 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = repo.SaveIndex(ctx)
if err != nil {
t.Fatal(err)
}
for h, n := range repo.saved { for h, n := range repo.saved {
if n > 1 { if n > 1 {
t.Errorf("iteration %v: blob %v saved more than once (%d times)", i, h, n) t.Errorf("iteration %v: blob %v saved more than once (%d times)", i, h, n)
@ -1085,11 +1073,6 @@ func TestArchiverSaveTree(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = repo.SaveIndex(ctx)
if err != nil {
t.Fatal(err)
}
want := test.want want := test.want
if want == nil { if want == nil {
want = test.src want = test.src
@ -1841,13 +1824,13 @@ type failSaveRepo struct {
err error err error
} }
func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, error) {
val := atomic.AddInt32(&f.cnt, 1) val := atomic.AddInt32(&f.cnt, 1)
if val >= f.failAfter { if val >= f.failAfter {
return restic.ID{}, f.err return restic.ID{}, false, f.err
} }
return f.Repository.SaveBlob(ctx, t, buf, id) return f.Repository.SaveBlob(ctx, t, buf, id, storeDuplicate)
} }
func TestArchiverAbortEarlyOnError(t *testing.T) { func TestArchiverAbortEarlyOnError(t *testing.T) {

View File

@ -2,7 +2,6 @@ package archiver
import ( import (
"context" "context"
"sync"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
@ -11,17 +10,13 @@ import (
// Saver allows saving a blob. // Saver allows saving a blob.
type Saver interface { type Saver interface {
SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID) (restic.ID, error) SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, error)
Index() restic.Index Index() restic.Index
} }
// BlobSaver concurrently saves incoming blobs to the repo. // BlobSaver concurrently saves incoming blobs to the repo.
type BlobSaver struct { type BlobSaver struct {
repo Saver repo Saver
m sync.Mutex
knownBlobs restic.BlobSet
ch chan<- saveBlobJob ch chan<- saveBlobJob
} }
@ -31,7 +26,6 @@ func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *
ch := make(chan saveBlobJob) ch := make(chan saveBlobJob)
s := &BlobSaver{ s := &BlobSaver{
repo: repo, repo: repo,
knownBlobs: restic.NewBlobSet(),
ch: ch, ch: ch,
} }
@ -106,45 +100,15 @@ type saveBlobResponse struct {
} }
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) { func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) {
id := restic.Hash(buf) id, known, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false)
h := restic.BlobHandle{ID: id, Type: t}
// check if another goroutine has already saved this blob
known := false
s.m.Lock()
if s.knownBlobs.Has(h) {
known = true
} else {
s.knownBlobs.Insert(h)
known = false
}
s.m.Unlock()
// blob is already known, nothing to do
if known {
return saveBlobResponse{
id: id,
known: true,
}, nil
}
// check if the repo knows this blob
if s.repo.Index().Has(id, t) {
return saveBlobResponse{
id: id,
known: true,
}, nil
}
// otherwise we're responsible for saving it
_, err := s.repo.SaveBlob(ctx, t, buf, id)
if err != nil { if err != nil {
return saveBlobResponse{}, err return saveBlobResponse{}, err
} }
return saveBlobResponse{ return saveBlobResponse{
id: id, id: id,
known: false, known: known,
}, nil }, nil
} }

View File

@ -21,13 +21,13 @@ type saveFail struct {
failAt int32 failAt int32
} }
func (b *saveFail) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { func (b *saveFail) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicates bool) (restic.ID, bool, error) {
val := atomic.AddInt32(&b.cnt, 1) val := atomic.AddInt32(&b.cnt, 1)
if val == b.failAt { if val == b.failAt {
return restic.ID{}, errTest return restic.ID{}, false, errTest
} }
return id, nil return id, false, nil
} }
func (b *saveFail) Index() restic.Index { func (b *saveFail) Index() restic.Index {

View File

@ -1,53 +0,0 @@
package archiver
import (
"context"
"time"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
)
// IndexUploader polls the repo for full indexes and uploads them.
type IndexUploader struct {
restic.Repository
// Start is called when an index is to be uploaded.
Start func()
// Complete is called when uploading an index has finished.
Complete func(id restic.ID)
}
// Upload periodically uploads full indexes to the repo. When shutdown is
// cancelled, the last index upload will finish and then Upload returns.
func (u IndexUploader) Upload(ctx, shutdown context.Context, interval time.Duration) error {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-shutdown.Done():
return nil
case <-ticker.C:
full := u.Repository.Index().(*repository.MasterIndex).FullIndexes()
for _, idx := range full {
if u.Start != nil {
u.Start()
}
id, err := repository.SaveIndex(ctx, u.Repository, idx)
if err != nil {
debug.Log("save indexes returned an error: %v", err)
return err
}
if u.Complete != nil {
u.Complete(id)
}
}
}
}
}

View File

@ -87,8 +87,7 @@ var IndexFull = func(idx *Index) bool {
} }
// Store remembers the id and pack in the index. An existing entry will be // Store remembers the id and pack in the index.
// silently overwritten.
func (idx *Index) Store(blob restic.PackedBlob) { func (idx *Index) Store(blob restic.PackedBlob) {
idx.m.Lock() idx.m.Lock()
defer idx.m.Unlock() defer idx.m.Unlock()
@ -102,6 +101,23 @@ func (idx *Index) Store(blob restic.PackedBlob) {
idx.store(blob) idx.store(blob)
} }
// StorePack remembers the ids of all blobs of a given pack
// in the index
func (idx *Index) StorePack(id restic.ID, blobs []restic.Blob) {
idx.m.Lock()
defer idx.m.Unlock()
if idx.final {
panic("store new item in finalized index")
}
debug.Log("%v", blobs)
for _, blob := range blobs {
idx.store(restic.PackedBlob{Blob: blob, PackID: id})
}
}
// Lookup queries the index for the blob ID and returns a restic.PackedBlob. // Lookup queries the index for the blob ID and returns a restic.PackedBlob.
func (idx *Index) Lookup(id restic.ID, tpe restic.BlobType) (blobs []restic.PackedBlob, found bool) { func (idx *Index) Lookup(id restic.ID, tpe restic.BlobType) (blobs []restic.PackedBlob, found bool) {
idx.m.Lock() idx.m.Lock()
@ -353,15 +369,13 @@ func (idx *Index) encode(w io.Writer) error {
return enc.Encode(idxJSON) return enc.Encode(idxJSON)
} }
// Finalize sets the index to final and writes the JSON serialization to w. // Finalize sets the index to final.
func (idx *Index) Finalize(w io.Writer) error { func (idx *Index) Finalize() {
debug.Log("encoding index") debug.Log("finalizing index")
idx.m.Lock() idx.m.Lock()
defer idx.m.Unlock() defer idx.m.Unlock()
idx.final = true idx.final = true
return idx.encode(w)
} }
// ID returns the ID of the index, if available. If the index is not yet // ID returns the ID of the index, if available. If the index is not yet

View File

@ -123,9 +123,10 @@ func TestIndexSerialize(t *testing.T) {
} }
} }
// serialize idx, unserialize to idx3 // finalize; serialize idx, unserialize to idx3
idx.Finalize()
wr3 := bytes.NewBuffer(nil) wr3 := bytes.NewBuffer(nil)
err = idx.Finalize(wr3) err = idx.Encode(wr3)
rtest.OK(t, err) rtest.OK(t, err)
rtest.Assert(t, idx.Final(), rtest.Assert(t, idx.Final(),

View File

@ -12,12 +12,13 @@ import (
// MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved. // MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved.
type MasterIndex struct { type MasterIndex struct {
idx []*Index idx []*Index
pendingBlobs restic.BlobSet
idxMutex sync.RWMutex idxMutex sync.RWMutex
} }
// NewMasterIndex creates a new master index. // NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex { func NewMasterIndex() *MasterIndex {
return &MasterIndex{} return &MasterIndex{pendingBlobs: restic.NewBlobSet()}
} }
// Lookup queries all known Indexes for the ID and returns the first match. // Lookup queries all known Indexes for the ID and returns the first match.
@ -65,11 +66,42 @@ func (mi *MasterIndex) ListPack(id restic.ID) (list []restic.PackedBlob) {
return nil return nil
} }
// AddPending adds a given blob to list of pending Blobs
// Before doing so it checks if this blob is already known.
// Returns true if adding was successful and false if the blob
// was already known
func (mi *MasterIndex) addPending(id restic.ID, tpe restic.BlobType) bool {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// Check if blob is pending or in index
if mi.pendingBlobs.Has(restic.BlobHandle{ID: id, Type: tpe}) {
return false
}
for _, idx := range mi.idx {
if idx.Has(id, tpe) {
return false
}
}
// really not known -> insert
mi.pendingBlobs.Insert(restic.BlobHandle{ID: id, Type: tpe})
return true
}
// Has queries all known Indexes for the ID and returns the first match. // Has queries all known Indexes for the ID and returns the first match.
// Also returns true if the ID is pending.
func (mi *MasterIndex) Has(id restic.ID, tpe restic.BlobType) bool { func (mi *MasterIndex) Has(id restic.ID, tpe restic.BlobType) bool {
mi.idxMutex.RLock() mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock() defer mi.idxMutex.RUnlock()
// also return true if blob is pending
if mi.pendingBlobs.Has(restic.BlobHandle{ID: id, Type: tpe}) {
return true
}
for _, idx := range mi.idx { for _, idx := range mi.idx {
if idx.Has(id, tpe) { if idx.Has(id, tpe) {
return true return true
@ -114,24 +146,30 @@ func (mi *MasterIndex) Remove(index *Index) {
} }
// Store remembers the id and pack in the index. // Store remembers the id and pack in the index.
func (mi *MasterIndex) Store(pb restic.PackedBlob) { func (mi *MasterIndex) StorePack(id restic.ID, blobs []restic.Blob) {
mi.idxMutex.Lock() mi.idxMutex.Lock()
defer mi.idxMutex.Unlock() defer mi.idxMutex.Unlock()
// delete blobs from pending
for _, blob := range blobs {
mi.pendingBlobs.Delete(restic.BlobHandle{Type: blob.Type, ID: blob.ID})
}
for _, idx := range mi.idx { for _, idx := range mi.idx {
if !idx.Final() { if !idx.Final() {
idx.Store(pb) idx.StorePack(id, blobs)
return return
} }
} }
newIdx := NewIndex() newIdx := NewIndex()
newIdx.Store(pb) newIdx.StorePack(id, blobs)
mi.idx = append(mi.idx, newIdx) mi.idx = append(mi.idx, newIdx)
} }
// NotFinalIndexes returns all indexes that have not yet been saved. // FinalizeNotFinalIndexes finalizes all indexes that
func (mi *MasterIndex) NotFinalIndexes() []*Index { // have not yet been saved and returns that list
func (mi *MasterIndex) FinalizeNotFinalIndexes() []*Index {
mi.idxMutex.Lock() mi.idxMutex.Lock()
defer mi.idxMutex.Unlock() defer mi.idxMutex.Unlock()
@ -139,6 +177,7 @@ func (mi *MasterIndex) NotFinalIndexes() []*Index {
for _, idx := range mi.idx { for _, idx := range mi.idx {
if !idx.Final() { if !idx.Final() {
idx.Finalize()
list = append(list, idx) list = append(list, idx)
} }
} }
@ -147,8 +186,8 @@ func (mi *MasterIndex) NotFinalIndexes() []*Index {
return list return list
} }
// FullIndexes returns all indexes that are full. // FinalizeFullIndexes finalizes all indexes that are full and returns that list.
func (mi *MasterIndex) FullIndexes() []*Index { func (mi *MasterIndex) FinalizeFullIndexes() []*Index {
mi.idxMutex.Lock() mi.idxMutex.Lock()
defer mi.idxMutex.Unlock() defer mi.idxMutex.Unlock()
@ -163,6 +202,7 @@ func (mi *MasterIndex) FullIndexes() []*Index {
if IndexFull(idx) { if IndexFull(idx) {
debug.Log("index %p is full", idx) debug.Log("index %p is full", idx)
idx.Finalize()
list = append(list, idx) list = append(list, idx)
} else { } else {
debug.Log("index %p not full", idx) debug.Log("index %p not full", idx)

View File

@ -136,21 +136,15 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe
} }
// update blobs in the index // update blobs in the index
for _, b := range p.Packer.Blobs() { debug.Log(" updating blobs %v to pack %v", p.Packer.Blobs(), id)
debug.Log(" updating blob %v to pack %v", b.ID, id) r.idx.StorePack(id, p.Packer.Blobs())
r.idx.Store(restic.PackedBlob{
Blob: restic.Blob{
Type: b.Type,
ID: b.ID,
Offset: b.Offset,
Length: uint(b.Length),
},
PackID: id,
})
}
// Save index if full
if r.noAutoIndexUpdate {
return nil return nil
} }
return r.SaveFullIndex(ctx)
}
// countPacker returns the number of open (unfinished) packers. // countPacker returns the number of open (unfinished) packers.
func (r *packerManager) countPacker() int { func (r *packerManager) countPacker() int {

View File

@ -84,7 +84,8 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee
h, tempfile.Name(), id) h, tempfile.Name(), id)
} }
_, err = repo.SaveBlob(ctx, entry.Type, plaintext, entry.ID) // We do want to save already saved blobs!
_, _, err = repo.SaveBlob(ctx, entry.Type, plaintext, entry.ID, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -31,18 +31,17 @@ func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData fl
buf := make([]byte, length) buf := make([]byte, length)
rand.Read(buf) rand.Read(buf)
id := restic.Hash(buf)
if repo.Index().Has(id, restic.DataBlob) { id, exists, err := repo.SaveBlob(context.TODO(), tpe, buf, restic.ID{}, false)
t.Errorf("duplicate blob %v/%v ignored", id, restic.DataBlob)
continue
}
_, err := repo.SaveBlob(context.TODO(), tpe, buf, id)
if err != nil { if err != nil {
t.Fatalf("SaveFrom() error %v", err) t.Fatalf("SaveFrom() error %v", err)
} }
if exists {
t.Errorf("duplicate blob %v/%v ignored", id, restic.DataBlob)
continue
}
if rand.Float32() < 0.2 { if rand.Float32() < 0.2 {
if err = repo.Flush(context.Background()); err != nil { if err = repo.Flush(context.Background()); err != nil {
t.Fatalf("repo.Flush() returned error %v", err) t.Fatalf("repo.Flush() returned error %v", err)

View File

@ -28,6 +28,7 @@ type Repository struct {
keyName string keyName string
idx *MasterIndex idx *MasterIndex
restic.Cache restic.Cache
noAutoIndexUpdate bool
treePM *packerManager treePM *packerManager
dataPM *packerManager dataPM *packerManager
@ -45,6 +46,10 @@ func New(be restic.Backend) *Repository {
return repo return repo
} }
func (r *Repository) DisableAutoIndexUpdate() {
r.noAutoIndexUpdate = true
}
// Config returns the repository configuration. // Config returns the repository configuration.
func (r *Repository) Config() restic.Config { func (r *Repository) Config() restic.Config {
return r.cfg return r.cfg
@ -221,13 +226,8 @@ func (r *Repository) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, bo
// SaveAndEncrypt encrypts data and stores it to the backend as type t. If data // SaveAndEncrypt encrypts data and stores it to the backend as type t. If data
// is small enough, it will be packed together with other small blobs. // is small enough, it will be packed together with other small blobs.
func (r *Repository) SaveAndEncrypt(ctx context.Context, t restic.BlobType, data []byte, id *restic.ID) (restic.ID, error) { // The caller must ensure that the id matches the data.
if id == nil { func (r *Repository) SaveAndEncrypt(ctx context.Context, t restic.BlobType, data []byte, id restic.ID) error {
// compute plaintext hash
hashedID := restic.Hash(data)
id = &hashedID
}
debug.Log("save id %v (%v, %d bytes)", id, t, len(data)) debug.Log("save id %v (%v, %d bytes)", id, t, len(data))
nonce := crypto.NewRandomNonce() nonce := crypto.NewRandomNonce()
@ -252,24 +252,24 @@ func (r *Repository) SaveAndEncrypt(ctx context.Context, t restic.BlobType, data
packer, err := pm.findPacker() packer, err := pm.findPacker()
if err != nil { if err != nil {
return restic.ID{}, err return err
} }
// save ciphertext // save ciphertext
_, err = packer.Add(t, *id, ciphertext) _, err = packer.Add(t, id, ciphertext)
if err != nil { if err != nil {
return restic.ID{}, err return err
} }
// if the pack is not full enough, put back to the list // if the pack is not full enough, put back to the list
if packer.Size() < minPackSize { if packer.Size() < minPackSize {
debug.Log("pack is not full enough (%d bytes)", packer.Size()) debug.Log("pack is not full enough (%d bytes)", packer.Size())
pm.insertPacker(packer) pm.insertPacker(packer)
return *id, nil return nil
} }
// else write the pack to the backend // else write the pack to the backend
return *id, r.savePacker(ctx, t, packer) return r.savePacker(ctx, t, packer)
} }
// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the
@ -307,8 +307,18 @@ func (r *Repository) SaveUnpacked(ctx context.Context, t restic.FileType, p []by
return id, nil return id, nil
} }
// Flush saves all remaining packs. // Flush saves all remaining packs and the index
func (r *Repository) Flush(ctx context.Context) error { func (r *Repository) Flush(ctx context.Context) error {
if err := r.FlushPacks(ctx); err != nil {
return err
}
// Save index after flushing
return r.SaveIndex(ctx)
}
// FlushPacks saves all remaining packs.
func (r *Repository) FlushPacks(ctx context.Context) error {
pms := []struct { pms := []struct {
t restic.BlobType t restic.BlobType
pm *packerManager pm *packerManager
@ -331,7 +341,6 @@ func (r *Repository) Flush(ctx context.Context) error {
p.pm.packers = p.pm.packers[:0] p.pm.packers = p.pm.packers[:0]
p.pm.pm.Unlock() p.pm.pm.Unlock()
} }
return nil return nil
} }
@ -366,7 +375,7 @@ func (r *Repository) SetIndex(i restic.Index) error {
func SaveIndex(ctx context.Context, repo restic.Repository, index *Index) (restic.ID, error) { func SaveIndex(ctx context.Context, repo restic.Repository, index *Index) (restic.ID, error) {
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
err := index.Finalize(buf) err := index.Encode(buf)
if err != nil { if err != nil {
return restic.ID{}, err return restic.ID{}, err
} }
@ -392,12 +401,12 @@ func (r *Repository) saveIndex(ctx context.Context, indexes ...*Index) error {
// SaveIndex saves all new indexes in the backend. // SaveIndex saves all new indexes in the backend.
func (r *Repository) SaveIndex(ctx context.Context) error { func (r *Repository) SaveIndex(ctx context.Context) error {
return r.saveIndex(ctx, r.idx.NotFinalIndexes()...) return r.saveIndex(ctx, r.idx.FinalizeNotFinalIndexes()...)
} }
// SaveFullIndex saves all full indexes in the backend. // SaveFullIndex saves all full indexes in the backend.
func (r *Repository) SaveFullIndex(ctx context.Context) error { func (r *Repository) SaveFullIndex(ctx context.Context) error {
return r.saveIndex(ctx, r.idx.FullIndexes()...) return r.saveIndex(ctx, r.idx.FinalizeFullIndexes()...)
} }
const loadIndexParallelism = 4 const loadIndexParallelism = 4
@ -670,14 +679,29 @@ func (r *Repository) Close() error {
return r.be.Close() return r.be.Close()
} }
// SaveBlob saves a blob of type t into the repository. If id is the null id, it // SaveBlob saves a blob of type t into the repository.
// will be computed and returned. // It takes care that no duplicates are saved; this can be overwritten
func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { // by setting storeDuplicate to true.
var i *restic.ID // If id is the null id, it will be computed and returned.
if !id.IsNull() { // Also returns if the blob was already known before
i = &id func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, err error) {
// compute plaintext hash if not already set
if id.IsNull() {
newID = restic.Hash(buf)
} else {
newID = id
} }
return r.SaveAndEncrypt(ctx, t, buf, i)
// first try to add to pending blobs; if not successful, this blob is already known
known = !r.idx.addPending(newID, t)
// only save when needed or explicitely told
if !known || storeDuplicate {
err = r.SaveAndEncrypt(ctx, t, buf, newID)
}
return newID, known, err
} }
// LoadTree loads a tree from the repository. // LoadTree loads a tree from the repository.
@ -711,12 +735,7 @@ func (r *Repository) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, e
// adds a newline after each object) // adds a newline after each object)
buf = append(buf, '\n') buf = append(buf, '\n')
id := restic.Hash(buf) id, _, err := r.SaveBlob(ctx, restic.TreeBlob, buf, restic.ID{}, false)
if r.idx.Has(id, restic.TreeBlob) {
return id, nil
}
_, err = r.SaveBlob(ctx, restic.TreeBlob, buf, id)
return id, err return id, err
} }

View File

@ -34,7 +34,7 @@ func TestSave(t *testing.T) {
id := restic.Hash(data) id := restic.Hash(data)
// save // save
sid, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, restic.ID{}) sid, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, restic.ID{}, false)
rtest.OK(t, err) rtest.OK(t, err)
rtest.Equals(t, id, sid) rtest.Equals(t, id, sid)
@ -69,7 +69,7 @@ func TestSaveFrom(t *testing.T) {
id := restic.Hash(data) id := restic.Hash(data)
// save // save
id2, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, id) id2, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, id, false)
rtest.OK(t, err) rtest.OK(t, err)
rtest.Equals(t, id, id2) rtest.Equals(t, id, id2)
@ -108,7 +108,7 @@ func BenchmarkSaveAndEncrypt(t *testing.B) {
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
// save // save
_, err = repo.SaveBlob(context.TODO(), restic.DataBlob, data, id) _, _, err = repo.SaveBlob(context.TODO(), restic.DataBlob, data, id, false)
rtest.OK(t, err) rtest.OK(t, err)
} }
} }
@ -158,7 +158,7 @@ func TestLoadBlob(t *testing.T) {
_, err := io.ReadFull(rnd, buf) _, err := io.ReadFull(rnd, buf)
rtest.OK(t, err) rtest.OK(t, err)
id, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}) id, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false)
rtest.OK(t, err) rtest.OK(t, err)
rtest.OK(t, repo.Flush(context.Background())) rtest.OK(t, repo.Flush(context.Background()))
@ -187,7 +187,7 @@ func BenchmarkLoadBlob(b *testing.B) {
_, err := io.ReadFull(rnd, buf) _, err := io.ReadFull(rnd, buf)
rtest.OK(b, err) rtest.OK(b, err)
id, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}) id, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false)
rtest.OK(b, err) rtest.OK(b, err)
rtest.OK(b, repo.Flush(context.Background())) rtest.OK(b, repo.Flush(context.Background()))
@ -322,15 +322,17 @@ func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax
_, err := io.ReadFull(rnd, buf) _, err := io.ReadFull(rnd, buf)
rtest.OK(t, err) rtest.OK(t, err)
_, err = repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}) _, _, err = repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false)
rtest.OK(t, err) rtest.OK(t, err)
} }
} }
func TestRepositoryIncrementalIndex(t *testing.T) { func TestRepositoryIncrementalIndex(t *testing.T) {
repo, cleanup := repository.TestRepository(t) r, cleanup := repository.TestRepository(t)
defer cleanup() defer cleanup()
repo := r.(*repository.Repository)
repository.IndexFull = func(*repository.Index) bool { return true } repository.IndexFull = func(*repository.Index) bool { return true }
// add 15 packs // add 15 packs
@ -338,7 +340,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) {
// add 3 packs, write intermediate index // add 3 packs, write intermediate index
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
saveRandomDataBlobs(t, repo, 5, 1<<15) saveRandomDataBlobs(t, repo, 5, 1<<15)
rtest.OK(t, repo.Flush(context.Background())) rtest.OK(t, repo.FlushPacks(context.Background()))
} }
rtest.OK(t, repo.SaveFullIndex(context.TODO())) rtest.OK(t, repo.SaveFullIndex(context.TODO()))
@ -347,7 +349,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) {
// add another 5 packs // add another 5 packs
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
saveRandomDataBlobs(t, repo, 5, 1<<15) saveRandomDataBlobs(t, repo, 5, 1<<15)
rtest.OK(t, repo.Flush(context.Background())) rtest.OK(t, repo.FlushPacks(context.Background()))
} }
// save final index // save final index

View File

@ -46,7 +46,7 @@ type Repository interface {
LoadAndDecrypt(ctx context.Context, buf []byte, t FileType, id ID) (data []byte, err error) LoadAndDecrypt(ctx context.Context, buf []byte, t FileType, id ID) (data []byte, err error)
LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error) LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error)
SaveBlob(context.Context, BlobType, []byte, ID) (ID, error) SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, error)
LoadTree(context.Context, ID) (*Tree, error) LoadTree(context.Context, ID) (*Tree, error)
SaveTree(context.Context, *Tree) (ID, error) SaveTree(context.Context, *Tree) (ID, error)

View File

@ -22,7 +22,6 @@ func fakeFile(seed, size int64) io.Reader {
type fakeFileSystem struct { type fakeFileSystem struct {
t testing.TB t testing.TB
repo Repository repo Repository
knownBlobs IDSet
duplication float32 duplication float32
buf []byte buf []byte
chunker *chunker.Chunker chunker *chunker.Chunker
@ -55,12 +54,11 @@ func (fs *fakeFileSystem) saveFile(ctx context.Context, rd io.Reader) (blobs IDs
id := Hash(chunk.Data) id := Hash(chunk.Data)
if !fs.blobIsKnown(id, DataBlob) { if !fs.blobIsKnown(id, DataBlob) {
_, err := fs.repo.SaveBlob(ctx, DataBlob, chunk.Data, id) _, _, err := fs.repo.SaveBlob(ctx, DataBlob, chunk.Data, id, true)
if err != nil { if err != nil {
fs.t.Fatalf("error saving chunk: %v", err) fs.t.Fatalf("error saving chunk: %v", err)
} }
fs.knownBlobs.Insert(id)
} }
blobs = append(blobs, id) blobs = append(blobs, id)
@ -92,15 +90,10 @@ func (fs *fakeFileSystem) blobIsKnown(id ID, t BlobType) bool {
return false return false
} }
if fs.knownBlobs.Has(id) {
return true
}
if fs.repo.Index().Has(id, t) { if fs.repo.Index().Has(id, t) {
return true return true
} }
fs.knownBlobs.Insert(id)
return false return false
} }
@ -147,7 +140,7 @@ func (fs *fakeFileSystem) saveTree(ctx context.Context, seed int64, depth int) I
return id return id
} }
_, err := fs.repo.SaveBlob(ctx, TreeBlob, buf, id) _, _, err := fs.repo.SaveBlob(ctx, TreeBlob, buf, id, false)
if err != nil { if err != nil {
fs.t.Fatal(err) fs.t.Fatal(err)
} }
@ -174,7 +167,6 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int,
fs := fakeFileSystem{ fs := fakeFileSystem{
t: t, t: t,
repo: repo, repo: repo,
knownBlobs: NewIDSet(),
duplication: duplication, duplication: duplication,
rand: rand.New(rand.NewSource(seed)), rand: rand.New(rand.NewSource(seed)),
} }
@ -196,11 +188,6 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int,
t.Fatal(err) t.Fatal(err)
} }
err = repo.SaveIndex(context.TODO())
if err != nil {
t.Fatal(err)
}
return snapshot return snapshot
} }

View File

@ -38,7 +38,7 @@ func saveFile(t testing.TB, repo restic.Repository, node File) restic.ID {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
id, err := repo.SaveBlob(ctx, restic.DataBlob, []byte(node.Data), restic.ID{}) id, _, err := repo.SaveBlob(ctx, restic.DataBlob, []byte(node.Data), restic.ID{}, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -118,11 +118,6 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot) (*res
t.Fatal(err) t.Fatal(err)
} }
err = repo.SaveIndex(ctx)
if err != nil {
t.Fatal(err)
}
sn, err := restic.NewSnapshot([]string{"test"}, nil, "", time.Now()) sn, err := restic.NewSnapshot([]string{"test"}, nil, "", time.Now())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)