Add CreateIndexFromPacks()

This commit is contained in:
Alexander Weiss 2020-10-10 21:31:40 +02:00
parent e8713bc209
commit 43732bb885
1 changed files with 69 additions and 0 deletions

View File

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
"sync"
"github.com/restic/chunker"
"github.com/restic/restic/internal/cache"
@ -17,6 +18,7 @@ import (
"github.com/restic/restic/internal/hashing"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"github.com/minio/sha256-simd"
"golang.org/x/sync/errgroup"
@ -515,6 +517,73 @@ func (r *Repository) LoadIndex(ctx context.Context) error {
return nil
}
const listPackParallelism = 10
// CreateIndexFromPacks creates a new index by reading all given pack files (with sizes).
// The index is added to the MasterIndex but not marked as finalized.
// Returned is the list of pack files which could not be read.
func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[restic.ID]int64, p *progress.Counter) (invalid restic.IDs, err error) {
var m sync.Mutex
debug.Log("Loading index from pack files")
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
type FileInfo struct {
restic.ID
Size int64
}
ch := make(chan FileInfo)
// send list of pack files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
for id, size := range packsize {
select {
case <-ctx.Done():
return nil
case ch <- FileInfo{id, size}:
}
}
return nil
})
idx := NewIndex()
// a worker receives an pack ID from ch, reads the pack contents, and adds them to idx
worker := func() error {
for fi := range ch {
entries, _, err := r.ListPack(ctx, fi.ID, fi.Size)
if err != nil {
debug.Log("unable to list pack file %v", fi.ID.Str())
m.Lock()
invalid = append(invalid, fi.ID)
m.Unlock()
}
idx.StorePack(fi.ID, entries)
p.Add(1)
}
return nil
}
// run workers on ch
wg.Go(func() error {
return RunWorkers(listPackParallelism, worker)
})
err = wg.Wait()
if err != nil {
return invalid, errors.Fatal(err.Error())
}
// Add idx to MasterIndex
r.idx.Insert(idx)
return invalid, nil
}
// PrepareCache initializes the local cache. indexIDs is the list of IDs of
// index files still present in the repo.
func (r *Repository) PrepareCache(indexIDs restic.IDSet) error {