2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-22 04:45:15 +00:00

Merge pull request #3130 from aawsome/snapshots-parallel

Make loading snapshots parallel
This commit is contained in:
Alexander Neumann 2020-12-06 21:08:18 +01:00 committed by GitHub
commit 0d5b764f90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 129 additions and 120 deletions

View File

@ -0,0 +1,9 @@
Enhancement: Parallelize reading of snapshots
Restic used to read snapshots sequentially. For repositories containing
many snapshots this slowed down commands which have to read all snapshots.
Now the reading of snapshots is parallelized. This speeds up for example
`prune`, `backup` and other commands that search for snapshots with certain
properties or which have to find the `latest` snapshot.
https://github.com/restic/restic/pull/3130

View File

@ -55,8 +55,7 @@ func prettyPrintJSON(wr io.Writer, item interface{}) error {
}
func debugPrintSnapshots(ctx context.Context, repo *repository.Repository, wr io.Writer) error {
return repo.List(ctx, restic.SnapshotFile, func(id restic.ID, size int64) error {
snapshot, err := restic.LoadSnapshot(ctx, repo, id)
return restic.ForAllSnapshots(ctx, repo, nil, func(id restic.ID, snapshot *restic.Snapshot, err error) error {
if err != nil {
return err
}

View File

@ -159,19 +159,13 @@ func runPruneWithRepo(opts PruneOptions, gopts GlobalOptions, repo *repository.R
Print("warning: running prune without a cache, this may be very slow!\n")
}
Verbosef("loading all snapshots...\n")
snapshots, err := restic.LoadAllSnapshots(gopts.ctx, repo, ignoreSnapshots)
if err != nil {
return err
}
Verbosef("loading indexes...\n")
err = repo.LoadIndex(gopts.ctx)
err := repo.LoadIndex(gopts.ctx)
if err != nil {
return err
}
usedBlobs, err := getUsedBlobs(gopts, repo, snapshots)
usedBlobs, err := getUsedBlobs(gopts, repo, ignoreSnapshots)
if err != nil {
return err
}
@ -537,19 +531,34 @@ func rebuildIndexFiles(gopts GlobalOptions, repo restic.Repository, removePacks
return DeleteFilesChecked(gopts, repo, obsoleteIndexes, restic.IndexFile)
}
func getUsedBlobs(gopts GlobalOptions, repo restic.Repository, snapshots []*restic.Snapshot) (usedBlobs restic.BlobSet, err error) {
func getUsedBlobs(gopts GlobalOptions, repo restic.Repository, ignoreSnapshots restic.IDSet) (usedBlobs restic.BlobSet, err error) {
ctx := gopts.ctx
Verbosef("finding data that is still in use for %d snapshots\n", len(snapshots))
var snapshotTrees restic.IDs
Verbosef("loading all snapshots...\n")
err = restic.ForAllSnapshots(gopts.ctx, repo, ignoreSnapshots,
func(id restic.ID, sn *restic.Snapshot, err error) error {
debug.Log("add snapshot %v (tree %v, error %v)", id, *sn.Tree, err)
if err != nil {
return err
}
snapshotTrees = append(snapshotTrees, *sn.Tree)
return nil
})
if err != nil {
return nil, err
}
Verbosef("finding data that is still in use for %d snapshots\n", len(snapshotTrees))
usedBlobs = restic.NewBlobSet()
bar := newProgressMax(!gopts.Quiet, uint64(len(snapshots)), "snapshots")
bar := newProgressMax(!gopts.Quiet, uint64(len(snapshotTrees)), "snapshots")
defer bar.Done()
for _, sn := range snapshots {
debug.Log("process snapshot %v", sn.ID())
for _, tree := range snapshotTrees {
debug.Log("process tree %v", tree)
err = restic.FindUsedBlobs(ctx, repo, *sn.Tree, usedBlobs)
err = restic.FindUsedBlobs(ctx, repo, tree, usedBlobs)
if err != nil {
if repo.Backend().IsNotExist(err) {
return nil, errors.Fatal("unable to load a tree from the repo: " + err.Error())
@ -558,7 +567,7 @@ func getUsedBlobs(gopts GlobalOptions, repo restic.Repository, snapshots []*rest
return nil, err
}
debug.Log("processed snapshot %v", sn.ID())
debug.Log("processed tree %v", tree)
bar.Add(1)
}
return usedBlobs, nil

View File

@ -298,86 +298,6 @@ func (e Error) Error() string {
return e.Err.Error()
}
func loadTreeFromSnapshot(ctx context.Context, repo restic.Repository, id restic.ID) (restic.ID, error) {
sn, err := restic.LoadSnapshot(ctx, repo, id)
if err != nil {
debug.Log("error loading snapshot %v: %v", id, err)
return restic.ID{}, err
}
if sn.Tree == nil {
debug.Log("snapshot %v has no tree", id)
return restic.ID{}, errors.Errorf("snapshot %v has no tree", id)
}
return *sn.Tree, nil
}
// loadSnapshotTreeIDs loads all snapshots from backend and returns the tree IDs.
func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (restic.IDs, []error) {
var trees struct {
IDs restic.IDs
sync.Mutex
}
var errs struct {
errs []error
sync.Mutex
}
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan restic.ID)
// send list of index files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return repo.List(ctx, restic.SnapshotFile, func(id restic.ID, size int64) error {
select {
case <-ctx.Done():
return nil
case ch <- id:
}
return nil
})
})
// a worker receives an index ID from ch, loads the snapshot and the tree,
// and adds the result to errs and trees.
worker := func() error {
for id := range ch {
debug.Log("load snapshot %v", id)
treeID, err := loadTreeFromSnapshot(ctx, repo, id)
if err != nil {
errs.Lock()
errs.errs = append(errs.errs, err)
errs.Unlock()
continue
}
debug.Log("snapshot %v has tree %v", id, treeID)
trees.Lock()
trees.IDs = append(trees.IDs, treeID)
trees.Unlock()
}
return nil
}
for i := 0; i < defaultParallelism; i++ {
wg.Go(worker)
}
err := wg.Wait()
if err != nil {
errs.errs = append(errs.errs, err)
}
return trees.IDs, errs.errs
}
// TreeError collects several errors that occurred while processing a tree.
type TreeError struct {
ID restic.ID
@ -586,6 +506,24 @@ func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderCha
}
}
func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids restic.IDs, errs []error) {
err := restic.ForAllSnapshots(ctx, repo, nil, func(id restic.ID, sn *restic.Snapshot, err error) error {
if err != nil {
errs = append(errs, err)
return nil
}
treeID := *sn.Tree
debug.Log("snapshot %v has tree %v", id, treeID)
ids = append(ids, treeID)
return nil
})
if err != nil {
errs = append(errs, err)
}
return ids, errs
}
// Structure checks that for all snapshots all referenced data blobs and
// subtrees are available in the index. errChan is closed after all trees have
// been traversed.

View File

@ -5,8 +5,11 @@ import (
"fmt"
"os/user"
"path/filepath"
"sync"
"time"
"golang.org/x/sync/errgroup"
"github.com/restic/restic/internal/debug"
)
@ -66,27 +69,61 @@ func LoadSnapshot(ctx context.Context, repo Repository, id ID) (*Snapshot, error
return sn, nil
}
// LoadAllSnapshots returns a list of all snapshots in the repo.
// If a snapshot ID is in excludeIDs, it will not be included in the result.
func LoadAllSnapshots(ctx context.Context, repo Repository, excludeIDs IDSet) (snapshots []*Snapshot, err error) {
err = repo.List(ctx, SnapshotFile, func(id ID, size int64) error {
if excludeIDs.Has(id) {
return nil
}
sn, err := LoadSnapshot(ctx, repo, id)
if err != nil {
return err
}
const loadSnapshotParallelism = 5
snapshots = append(snapshots, sn)
return nil
// ForAllSnapshots reads all snapshots in parallel and calls the
// given function. It is guaranteed that the function is not run concurrently.
// If the called function returns an error, this function is cancelled and
// also returns this error.
// If a snapshot ID is in excludeIDs, it will be ignored.
func ForAllSnapshots(ctx context.Context, repo Repository, excludeIDs IDSet, fn func(ID, *Snapshot, error) error) error {
var m sync.Mutex
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan ID)
// send list of snapshot files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return repo.List(ctx, SnapshotFile, func(id ID, size int64) error {
if excludeIDs.Has(id) {
return nil
}
select {
case <-ctx.Done():
return nil
case ch <- id:
}
return nil
})
})
if err != nil {
return nil, err
// a worker receives an snapshot ID from ch, loads the snapshot
// and runs fn with id, the snapshot and the error
worker := func() error {
for id := range ch {
debug.Log("load snapshot %v", id)
sn, err := LoadSnapshot(ctx, repo, id)
m.Lock()
err = fn(id, sn, err)
m.Unlock()
if err != nil {
return err
}
}
return nil
}
return snapshots, nil
for i := 0; i < loadSnapshotParallelism; i++ {
wg.Go(worker)
}
return wg.Wait()
}
func (sn Snapshot) String() string {

View File

@ -33,10 +33,9 @@ func FindLatestSnapshot(ctx context.Context, repo Repository, targets []string,
found bool
)
err = repo.List(ctx, SnapshotFile, func(snapshotID ID, size int64) error {
snapshot, err := LoadSnapshot(ctx, repo, snapshotID)
err = ForAllSnapshots(ctx, repo, nil, func(id ID, snapshot *Snapshot, err error) error {
if err != nil {
return errors.Errorf("Error loading snapshot %v: %v", snapshotID.Str(), err)
return errors.Errorf("Error loading snapshot %v: %v", id.Str(), err)
}
if snapshot.Time.Before(latest) {
@ -56,7 +55,7 @@ func FindLatestSnapshot(ctx context.Context, repo Repository, targets []string,
}
latest = snapshot.Time
latestID = snapshotID
latestID = id
found = true
return nil
})
@ -90,8 +89,7 @@ func FindSnapshot(ctx context.Context, repo Repository, s string) (ID, error) {
func FindFilteredSnapshots(ctx context.Context, repo Repository, hosts []string, tags []TagList, paths []string) (Snapshots, error) {
results := make(Snapshots, 0, 20)
err := repo.List(ctx, SnapshotFile, func(id ID, size int64) error {
sn, err := LoadSnapshot(ctx, repo, id)
err := ForAllSnapshots(ctx, repo, nil, func(id ID, sn *Snapshot, err error) error {
if err != nil {
fmt.Fprintf(os.Stderr, "could not load snapshot %v: %v\n", id.Str(), err)
return nil

View File

@ -17,6 +17,25 @@ const (
testDepth = 2
)
// LoadAllSnapshots returns a list of all snapshots in the repo.
// If a snapshot ID is in excludeIDs, it will not be included in the result.
func loadAllSnapshots(ctx context.Context, repo restic.Repository, excludeIDs restic.IDSet) (snapshots restic.Snapshots, err error) {
err = restic.ForAllSnapshots(ctx, repo, excludeIDs, func(id restic.ID, sn *restic.Snapshot, err error) error {
if err != nil {
return err
}
snapshots = append(snapshots, sn)
return nil
})
if err != nil {
return nil, err
}
return snapshots, nil
}
func TestCreateSnapshot(t *testing.T) {
repo, cleanup := repository.TestRepository(t)
defer cleanup()
@ -25,7 +44,7 @@ func TestCreateSnapshot(t *testing.T) {
restic.TestCreateSnapshot(t, repo, testSnapshotTime.Add(time.Duration(i)*time.Second), testDepth, 0)
}
snapshots, err := restic.LoadAllSnapshots(context.TODO(), repo, restic.NewIDSet())
snapshots, err := loadAllSnapshots(context.TODO(), repo, restic.NewIDSet())
if err != nil {
t.Fatal(err)
}