diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 051b99252..f40ce9097 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "sync" "github.com/restic/chunker" "github.com/restic/restic/internal/cache" @@ -17,6 +18,7 @@ import ( "github.com/restic/restic/internal/hashing" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui/progress" "github.com/minio/sha256-simd" "golang.org/x/sync/errgroup" @@ -515,6 +517,73 @@ func (r *Repository) LoadIndex(ctx context.Context) error { return nil } +const listPackParallelism = 10 + +// CreateIndexFromPacks creates a new index by reading all given pack files (with sizes). +// The index is added to the MasterIndex but not marked as finalized. +// Returned is the list of pack files which could not be read. +func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[restic.ID]int64, p *progress.Counter) (invalid restic.IDs, err error) { + var m sync.Mutex + + debug.Log("Loading index from pack files") + + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) + + type FileInfo struct { + restic.ID + Size int64 + } + ch := make(chan FileInfo) + + // send list of pack files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + for id, size := range packsize { + select { + case <-ctx.Done(): + return nil + case ch <- FileInfo{id, size}: + } + } + return nil + }) + + idx := NewIndex() + // a worker receives an pack ID from ch, reads the pack contents, and adds them to idx + worker := func() error { + for fi := range ch { + entries, _, err := r.ListPack(ctx, fi.ID, fi.Size) + if err != nil { + debug.Log("unable to list pack file %v", fi.ID.Str()) + m.Lock() + invalid = append(invalid, fi.ID) + m.Unlock() + } + idx.StorePack(fi.ID, entries) + p.Add(1) + } + + return nil + } + + // run workers on ch + wg.Go(func() error { + return RunWorkers(listPackParallelism, worker) + }) + + err = wg.Wait() + if err != nil { + return invalid, errors.Fatal(err.Error()) + } + + // Add idx to MasterIndex + r.idx.Insert(idx) + + return invalid, nil +} + // PrepareCache initializes the local cache. indexIDs is the list of IDs of // index files still present in the repo. func (r *Repository) PrepareCache(indexIDs restic.IDSet) error {