Add Masterindex.Save(); Add Index.Packs()

This commit is contained in:
Alexander Weiss 2020-10-10 07:42:22 +02:00 committed by Alexander Neumann
parent 7f6f31c34b
commit 38cc4393f6
4 changed files with 178 additions and 34 deletions

View File

@ -275,6 +275,55 @@ func (idx *Index) Each(ctx context.Context) <-chan restic.PackedBlob {
return ch
}
type EachByPackResult struct {
packID restic.ID
blobs []restic.Blob
}
// EachByPack returns a channel that yields all blobs known to the index
// grouped by packID but ignoring blobs with a packID in packPlacklist.
// When the context is cancelled, the background goroutine
// terminates. This blocks any modification of the index.
func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult {
idx.m.Lock()
ch := make(chan EachByPackResult)
go func() {
defer idx.m.Unlock()
defer func() {
close(ch)
}()
for typ := range idx.byType {
byPack := make(map[restic.ID][]*indexEntry)
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
packID := idx.packs[e.packIndex]
if !packBlacklist.Has(packID) {
byPack[packID] = append(byPack[packID], e)
}
return true
})
for packID, pack := range byPack {
var result EachByPackResult
result.packID = packID
for _, e := range pack {
result.blobs = append(result.blobs, idx.toPackedBlob(e, restic.BlobType(typ)).Blob)
}
select {
case <-ctx.Done():
return
case ch <- result:
}
}
}
}()
return ch
}
// Packs returns all packs in this index
func (idx *Index) Packs() restic.IDSet {
idx.m.Lock()

View File

@ -97,6 +97,19 @@ func (mi *MasterIndex) Has(id restic.ID, tpe restic.BlobType) bool {
return false
}
// Packs returns all packs that are covered by the index.
func (mi *MasterIndex) Packs() restic.IDSet {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
packs := restic.NewIDSet()
for _, idx := range mi.idx {
packs.Merge(idx.Packs())
}
return packs
}
// Count returns the number of blobs of type t in the index.
func (mi *MasterIndex) Count(t restic.BlobType) (n uint) {
mi.idxMutex.RLock()
@ -248,49 +261,66 @@ func (mi *MasterIndex) MergeFinalIndexes() {
mi.idx = newIdx
}
// RebuildIndex combines all known indexes to a new index, leaving out any
// Save saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist. The new index contains the IDs
// of all known indexes in the "supersedes" field.
func (mi *MasterIndex) RebuildIndex(ctx context.Context, packBlacklist restic.IDSet) (*Index, error) {
// of all known indexes in the "supersedes" field. The IDs are also returned in
// the IDSet obsolete
// After calling this function, you should remove the obsolete index files.
func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, p *restic.Progress) (obsolete restic.IDSet, err error) {
p.Start()
defer p.Done()
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
debug.Log("start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist)
newIndex := NewIndex()
obsolete = restic.NewIDSet()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for i, idx := range mi.idx {
debug.Log("adding index %d", i)
for pb := range idx.Each(ctx) {
if packBlacklist.Has(pb.PackID) {
continue
}
newIndex.Store(pb)
}
if !idx.Final() {
debug.Log("index %d isn't final, don't add to supersedes field", i)
continue
}
ids, err := idx.IDs()
if err != nil {
debug.Log("index %d does not have an ID: %v", err)
return nil, err
}
debug.Log("adding index ids %v to supersedes field", ids)
err = newIndex.AddToSupersedes(ids...)
if err != nil {
return nil, err
finalize := func() error {
newIndex.Finalize()
if _, err := SaveIndex(ctx, repo, newIndex); err != nil {
return err
}
newIndex = NewIndex()
return nil
}
return newIndex, nil
for i, idx := range mi.idx {
if idx.Final() {
ids, err := idx.IDs()
if err != nil {
debug.Log("index %d does not have an ID: %v", err)
return nil, err
}
debug.Log("adding index ids %v to supersedes field", ids)
err = newIndex.AddToSupersedes(ids...)
if err != nil {
return nil, err
}
obsolete.Merge(restic.NewIDSet(ids...))
} else {
debug.Log("index %d isn't final, don't add to supersedes field", i)
}
debug.Log("adding index %d", i)
for pbs := range idx.EachByPack(ctx, packBlacklist) {
newIndex.StorePack(pbs.packID, pbs.blobs)
p.Report(restic.Stat{Blobs: 1})
if IndexFull(newIndex) {
if err := finalize(); err != nil {
return nil, err
}
}
}
}
if err := finalize(); err != nil {
return nil, err
}
return
}

View File

@ -5,7 +5,9 @@ import (
"fmt"
"math/rand"
"testing"
"time"
"github.com/restic/restic/internal/checker"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
@ -322,3 +324,65 @@ func BenchmarkMasterIndexLookupBlobSize(b *testing.B) {
mIdx.LookupSize(lookupID, restic.DataBlob)
}
}
var (
snapshotTime = time.Unix(1470492820, 207401672)
depth = 3
)
func createFilledRepo(t testing.TB, snapshots int, dup float32) (restic.Repository, func()) {
repo, cleanup := repository.TestRepository(t)
for i := 0; i < 3; i++ {
restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth, dup)
}
return repo, cleanup
}
func TestIndexSave(t *testing.T) {
repo, cleanup := createFilledRepo(t, 3, 0)
defer cleanup()
repo.LoadIndex(context.TODO())
obsoletes, err := repo.Index().(*repository.MasterIndex).Save(context.TODO(), repo, nil, nil)
if err != nil {
t.Fatalf("unable to save new index: %v", err)
}
for id := range obsoletes {
t.Logf("remove index %v", id.Str())
h := restic.Handle{Type: restic.IndexFile, Name: id.String()}
err = repo.Backend().Remove(context.TODO(), h)
if err != nil {
t.Errorf("error removing index %v: %v", id, err)
}
}
checker := checker.New(repo)
hints, errs := checker.LoadIndex(context.TODO())
for _, h := range hints {
t.Logf("hint: %v\n", h)
}
for _, err := range errs {
t.Errorf("checker found error: %v", err)
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
errCh := make(chan error)
go checker.Structure(ctx, errCh)
i := 0
for err := range errCh {
t.Errorf("checker returned error: %v", err)
i++
if i == 10 {
t.Errorf("more than 10 errors returned, skipping the rest")
cancel()
break
}
}
}

View File

@ -62,6 +62,7 @@ type MasterIndex interface {
Has(ID, BlobType) bool
Lookup(ID, BlobType) []PackedBlob
Count(BlobType) uint
Packs() IDSet
// Each returns a channel that yields all blobs known to the index. When
// the context is cancelled, the background goroutine terminates. This