2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-24 05:37:37 +00:00
restic/internal/index/master_index.go
Michael Eischer 5f7b48e65f index: replace Save() method with Rewrite and SaveFallback
Rewrite implements a streaming rewrite of the index that excludes the
given packs. For this it loads all index files from the repository and
only modifies those that require changes. This will reduce the index
churn when running prune. Rewrite does not require the in-memory index
and thus can drop it to significantly reduce the memory usage.

However, `prune --unsafe-recovery` cannot use this strategy and requires
a separate method to save the whole in-memory index. This is now handled
using SaveFallback.
2024-05-24 21:33:17 +02:00

658 lines
16 KiB
Go

package index
import (
"bytes"
"context"
"fmt"
"runtime"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
// MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved.
type MasterIndex struct {
idx []*Index
pendingBlobs restic.BlobSet
idxMutex sync.RWMutex
compress bool
}
// NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex {
mi := &MasterIndex{pendingBlobs: restic.NewBlobSet()}
mi.clear()
return mi
}
func (mi *MasterIndex) clear() {
// Always add an empty final index, such that MergeFinalIndexes can merge into this.
mi.idx = []*Index{NewIndex()}
mi.idx[0].Finalize()
}
func (mi *MasterIndex) MarkCompressed() {
mi.compress = true
}
// Lookup queries all known Indexes for the ID and returns all matches.
func (mi *MasterIndex) Lookup(bh restic.BlobHandle) (pbs []restic.PackedBlob) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
pbs = idx.Lookup(bh, pbs)
}
return pbs
}
// LookupSize queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) LookupSize(bh restic.BlobHandle) (uint, bool) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
if size, found := idx.LookupSize(bh); found {
return size, found
}
}
return 0, false
}
// AddPending adds a given blob to list of pending Blobs
// Before doing so it checks if this blob is already known.
// Returns true if adding was successful and false if the blob
// was already known
func (mi *MasterIndex) AddPending(bh restic.BlobHandle) bool {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// Check if blob is pending or in index
if mi.pendingBlobs.Has(bh) {
return false
}
for _, idx := range mi.idx {
if idx.Has(bh) {
return false
}
}
// really not known -> insert
mi.pendingBlobs.Insert(bh)
return true
}
// Has queries all known Indexes for the ID and returns the first match.
// Also returns true if the ID is pending.
func (mi *MasterIndex) Has(bh restic.BlobHandle) bool {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
// also return true if blob is pending
if mi.pendingBlobs.Has(bh) {
return true
}
for _, idx := range mi.idx {
if idx.Has(bh) {
return true
}
}
return false
}
// IDs returns the IDs of all indexes contained in the index.
func (mi *MasterIndex) IDs() restic.IDSet {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
ids := restic.NewIDSet()
for _, idx := range mi.idx {
if !idx.Final() {
continue
}
indexIDs, err := idx.IDs()
if err != nil {
debug.Log("not using index, ID() returned error %v", err)
continue
}
for _, id := range indexIDs {
ids.Insert(id)
}
}
return ids
}
// Packs returns all packs that are covered by the index.
// If packBlacklist is given, those packs are only contained in the
// resulting IDSet if they are contained in a non-final (newly written) index.
func (mi *MasterIndex) Packs(packBlacklist restic.IDSet) restic.IDSet {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
packs := restic.NewIDSet()
for _, idx := range mi.idx {
idxPacks := idx.Packs()
if idx.final && len(packBlacklist) > 0 {
idxPacks = idxPacks.Sub(packBlacklist)
}
packs.Merge(idxPacks)
}
return packs
}
// Insert adds a new index to the MasterIndex.
func (mi *MasterIndex) Insert(idx *Index) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
mi.idx = append(mi.idx, idx)
}
// StorePack remembers the id and pack in the index.
func (mi *MasterIndex) StorePack(id restic.ID, blobs []restic.Blob) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// delete blobs from pending
for _, blob := range blobs {
mi.pendingBlobs.Delete(restic.BlobHandle{Type: blob.Type, ID: blob.ID})
}
for _, idx := range mi.idx {
if !idx.Final() {
idx.StorePack(id, blobs)
return
}
}
newIdx := NewIndex()
newIdx.StorePack(id, blobs)
mi.idx = append(mi.idx, newIdx)
}
// finalizeNotFinalIndexes finalizes all indexes that
// have not yet been saved and returns that list
func (mi *MasterIndex) finalizeNotFinalIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
for _, idx := range mi.idx {
if !idx.Final() {
idx.Finalize()
list = append(list, idx)
}
}
debug.Log("return %d indexes", len(list))
return list
}
// finalizeFullIndexes finalizes all indexes that are full and returns that list.
func (mi *MasterIndex) finalizeFullIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
debug.Log("checking %d indexes", len(mi.idx))
for _, idx := range mi.idx {
if idx.Final() {
continue
}
if IndexFull(idx, mi.compress) {
debug.Log("index %p is full", idx)
idx.Finalize()
list = append(list, idx)
} else {
debug.Log("index %p not full", idx)
}
}
debug.Log("return %d indexes", len(list))
return list
}
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index.
func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
if err := idx.Each(ctx, fn); err != nil {
return err
}
}
return nil
}
// MergeFinalIndexes merges all final indexes together.
// After calling, there will be only one big final index in MasterIndex
// containing all final index contents.
// Indexes that are not final are left untouched.
// This merging can only be called after all index files are loaded - as
// removing of superseded index contents is only possible for unmerged indexes.
func (mi *MasterIndex) MergeFinalIndexes() error {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// The first index is always final and the one to merge into
newIdx := mi.idx[:1]
for i := 1; i < len(mi.idx); i++ {
idx := mi.idx[i]
// clear reference in masterindex as it may become stale
mi.idx[i] = nil
// do not merge indexes that have no id set
ids, _ := idx.IDs()
if !idx.Final() || len(ids) == 0 {
newIdx = append(newIdx, idx)
} else {
err := mi.idx[0].merge(idx)
if err != nil {
return fmt.Errorf("MergeFinalIndexes: %w", err)
}
}
}
mi.idx = newIdx
return nil
}
func (mi *MasterIndex) Load(ctx context.Context, r restic.ListerLoaderUnpacked, p *progress.Counter, cb func(id restic.ID, idx *Index, oldFormat bool, err error) error) error {
indexList, err := restic.MemorizeList(ctx, r, restic.IndexFile)
if err != nil {
return err
}
if p != nil {
var numIndexFiles uint64
err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error {
numIndexFiles++
return nil
})
if err != nil {
return err
}
p.SetMax(numIndexFiles)
defer p.Done()
}
err = ForAllIndexes(ctx, indexList, r, func(id restic.ID, idx *Index, oldFormat bool, err error) error {
if p != nil {
p.Add(1)
}
if cb != nil {
err = cb(id, idx, oldFormat, err)
}
if err != nil {
return err
}
// special case to allow check to ignore index loading errors
if idx == nil {
return nil
}
mi.Insert(idx)
return nil
})
if err != nil {
return err
}
return mi.MergeFinalIndexes()
}
type MasterIndexRewriteOpts struct {
SaveProgress *progress.Counter
DeleteProgress func() *progress.Counter
DeleteReport func(id restic.ID, err error)
}
// Rewrite removes packs whose ID is in excludePacks from all known indexes.
// It also removes the rewritten index files and those listed in extraObsolete.
// If oldIndexes is not nil, then only the indexes in this set are processed.
// This is used by repair index to only rewrite and delete the old indexes.
//
// Must not be called concurrently to any other MasterIndex operation.
func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error {
for _, idx := range mi.idx {
if !idx.Final() {
panic("internal error - index must be saved before calling MasterIndex.Rewrite")
}
}
var indexes restic.IDSet
if oldIndexes != nil {
// repair index adds new index entries for already existing pack files
// only remove the old (possibly broken) entries by only processing old indexes
indexes = oldIndexes
} else {
indexes = mi.IDs()
}
p := opts.SaveProgress
p.SetMax(uint64(len(indexes)))
// reset state which is not necessary for Rewrite and just consumes a lot of memory
// the index state would be invalid after Rewrite completes anyways
mi.clear()
runtime.GC()
// copy excludePacks to prevent unintended sideeffects
excludePacks = excludePacks.Clone()
debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(indexes), excludePacks)
wg, wgCtx := errgroup.WithContext(ctx)
idxCh := make(chan restic.ID)
wg.Go(func() error {
defer close(idxCh)
for id := range indexes {
select {
case idxCh <- id:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
})
var rewriteWg sync.WaitGroup
type rewriteTask struct {
idx *Index
oldFormat bool
}
rewriteCh := make(chan rewriteTask)
loader := func() error {
defer rewriteWg.Done()
for id := range idxCh {
buf, err := repo.LoadUnpacked(wgCtx, restic.IndexFile, id)
if err != nil {
return fmt.Errorf("LoadUnpacked(%v): %w", id.Str(), err)
}
idx, oldFormat, err := DecodeIndex(buf, id)
if err != nil {
return err
}
select {
case rewriteCh <- rewriteTask{idx, oldFormat}:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
}
// loading an index can take quite some time such that this can be both CPU- or IO-bound
loaderCount := int(repo.Connections()) + runtime.GOMAXPROCS(0)
// run workers on ch
for i := 0; i < loaderCount; i++ {
rewriteWg.Add(1)
wg.Go(loader)
}
wg.Go(func() error {
rewriteWg.Wait()
close(rewriteCh)
return nil
})
obsolete := restic.NewIDSet(extraObsolete...)
saveCh := make(chan *Index)
wg.Go(func() error {
defer close(saveCh)
newIndex := NewIndex()
for task := range rewriteCh {
// always rewrite indexes using the old format, that include a pack that must be removed or that are not full
if !task.oldFormat && len(task.idx.Packs().Intersect(excludePacks)) == 0 && IndexFull(task.idx, mi.compress) {
// make sure that each pack is only stored exactly once in the index
excludePacks.Merge(task.idx.Packs())
// index is already up to date
p.Add(1)
continue
}
ids, err := task.idx.IDs()
if err != nil || len(ids) != 1 {
panic("internal error, index has no ID")
}
obsolete.Merge(restic.NewIDSet(ids...))
for pbs := range task.idx.EachByPack(wgCtx, excludePacks) {
newIndex.StorePack(pbs.PackID, pbs.Blobs)
if IndexFull(newIndex, mi.compress) {
select {
case saveCh <- newIndex:
case <-wgCtx.Done():
return wgCtx.Err()
}
newIndex = NewIndex()
}
}
if wgCtx.Err() != nil {
return wgCtx.Err()
}
// make sure that each pack is only stored exactly once in the index
excludePacks.Merge(task.idx.Packs())
p.Add(1)
}
select {
case saveCh <- newIndex:
case <-wgCtx.Done():
}
return nil
})
// a worker receives an index from ch, and saves the index
worker := func() error {
for idx := range saveCh {
idx.Finalize()
if _, err := SaveIndex(wgCtx, repo, idx); err != nil {
return err
}
}
return nil
}
// encoding an index can take quite some time such that this can be both CPU- or IO-bound
workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0)
// run workers on ch
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}
err := wg.Wait()
p.Done()
if err != nil {
return fmt.Errorf("failed to rewrite indexes: %w", err)
}
p = nil
if opts.DeleteProgress != nil {
p = opts.DeleteProgress()
}
defer p.Done()
return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error {
if opts.DeleteReport != nil {
opts.DeleteReport(id, err)
}
return err
}, p)
}
// SaveFallback saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist from finalized indexes.
// It is only intended for use by prune with the UnsafeRecovery option.
//
// Must not be called concurrently to any other MasterIndex operation.
func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, p *progress.Counter) error {
p.SetMax(uint64(len(mi.Packs(excludePacks))))
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(mi.idx), excludePacks)
obsolete := restic.NewIDSet()
wg, wgCtx := errgroup.WithContext(ctx)
ch := make(chan *Index)
wg.Go(func() error {
defer close(ch)
newIndex := NewIndex()
for _, idx := range mi.idx {
if idx.Final() {
ids, err := idx.IDs()
if err != nil {
panic("internal error - finalized index without ID")
}
debug.Log("adding index ids %v to supersedes field", ids)
obsolete.Merge(restic.NewIDSet(ids...))
}
for pbs := range idx.EachByPack(wgCtx, excludePacks) {
newIndex.StorePack(pbs.PackID, pbs.Blobs)
p.Add(1)
if IndexFull(newIndex, mi.compress) {
select {
case ch <- newIndex:
case <-wgCtx.Done():
return wgCtx.Err()
}
newIndex = NewIndex()
}
}
if wgCtx.Err() != nil {
return wgCtx.Err()
}
}
select {
case ch <- newIndex:
case <-wgCtx.Done():
}
return nil
})
// a worker receives an index from ch, and saves the index
worker := func() error {
for idx := range ch {
idx.Finalize()
if _, err := SaveIndex(wgCtx, repo, idx); err != nil {
return err
}
}
return nil
}
// keep concurrency bounded as we're on a fallback path
workerCount := int(repo.Connections())
// run workers on ch
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}
err := wg.Wait()
p.Done()
// the index no longer matches to stored state
mi.clear()
return err
}
// SaveIndex saves an index in the repository.
func SaveIndex(ctx context.Context, repo restic.SaverUnpacked, index *Index) (restic.ID, error) {
buf := bytes.NewBuffer(nil)
err := index.Encode(buf)
if err != nil {
return restic.ID{}, err
}
id, err := repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes())
ierr := index.SetID(id)
if ierr != nil {
// logic bug
panic(ierr)
}
return id, err
}
// saveIndex saves all indexes in the backend.
func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, indexes ...*Index) error {
for i, idx := range indexes {
debug.Log("Saving index %d", i)
sid, err := SaveIndex(ctx, r, idx)
if err != nil {
return err
}
debug.Log("Saved index %d as %v", i, sid)
}
return mi.MergeFinalIndexes()
}
// SaveIndex saves all new indexes in the backend.
func (mi *MasterIndex) SaveIndex(ctx context.Context, r restic.SaverUnpacked) error {
return mi.saveIndex(ctx, r, mi.finalizeNotFinalIndexes()...)
}
// SaveFullIndex saves all full indexes in the backend.
func (mi *MasterIndex) SaveFullIndex(ctx context.Context, r restic.SaverUnpacked) error {
return mi.saveIndex(ctx, r, mi.finalizeFullIndexes()...)
}
// ListPacks returns the blobs of the specified pack files grouped by pack file.
func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs {
out := make(chan restic.PackBlobs)
go func() {
defer close(out)
// only resort a part of the index to keep the memory overhead bounded
for i := byte(0); i < 16; i++ {
packBlob := make(map[restic.ID][]restic.Blob)
for pack := range packs {
if pack[0]&0xf == i {
packBlob[pack] = nil
}
}
if len(packBlob) == 0 {
continue
}
err := mi.Each(ctx, func(pb restic.PackedBlob) {
if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i {
packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob)
}
})
if err != nil {
return
}
// pass on packs
for packID, pbs := range packBlob {
// allow GC
packBlob[packID] = nil
select {
case out <- restic.PackBlobs{PackID: packID, Blobs: pbs}:
case <-ctx.Done():
return
}
}
}
}()
return out
}