mirror of
https://github.com/octoleo/restic.git
synced 2024-12-22 10:58:55 +00:00
Merge pull request #3880 from MichaelEischer/archiver-savedir-cleanup
archiver: Improve handling of "file xxx already present" error
This commit is contained in:
commit
1a6160d152
12
changelog/unreleased/issue-1866
Normal file
12
changelog/unreleased/issue-1866
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
Enhancement: Improve handling of directories with duplicate entries
|
||||||
|
|
||||||
|
If for some reason a directory contains a duplicate entry, this caused the
|
||||||
|
backup command to fail with a `node "path/to/file" already present` or `nodes
|
||||||
|
are not ordered got "path/to/file", last "path/to/file"` error.
|
||||||
|
|
||||||
|
The error handling has been changed to only report a warning in this case. Make
|
||||||
|
sure to check that the filesystem in question is not damaged!
|
||||||
|
|
||||||
|
https://github.com/restic/restic/issues/1866
|
||||||
|
https://github.com/restic/restic/issues/3937
|
||||||
|
https://github.com/restic/restic/pull/3880
|
@ -172,37 +172,14 @@ func (arch *Archiver) error(item string, err error) error {
|
|||||||
return errf
|
return errf
|
||||||
}
|
}
|
||||||
|
|
||||||
// saveTree stores a tree in the repo. It checks the index and the known blobs
|
|
||||||
// before saving anything.
|
|
||||||
func (arch *Archiver) saveTree(ctx context.Context, t *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) {
|
|
||||||
var s ItemStats
|
|
||||||
buf, err := t.Finalize()
|
|
||||||
if err != nil {
|
|
||||||
return restic.ID{}, s, err
|
|
||||||
}
|
|
||||||
|
|
||||||
b := &Buffer{Data: buf}
|
|
||||||
res := arch.blobSaver.Save(ctx, restic.TreeBlob, b)
|
|
||||||
|
|
||||||
sbr := res.Take(ctx)
|
|
||||||
if !sbr.known {
|
|
||||||
s.TreeBlobs++
|
|
||||||
s.TreeSize += uint64(sbr.length)
|
|
||||||
s.TreeSizeInRepo += uint64(sbr.sizeInRepo)
|
|
||||||
}
|
|
||||||
// The context was canceled in the meantime, id might be invalid
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return restic.ID{}, s, ctx.Err()
|
|
||||||
}
|
|
||||||
return sbr.id, s, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// nodeFromFileInfo returns the restic node from an os.FileInfo.
|
// nodeFromFileInfo returns the restic node from an os.FileInfo.
|
||||||
func (arch *Archiver) nodeFromFileInfo(filename string, fi os.FileInfo) (*restic.Node, error) {
|
func (arch *Archiver) nodeFromFileInfo(snPath, filename string, fi os.FileInfo) (*restic.Node, error) {
|
||||||
node, err := restic.NodeFromFileInfo(filename, fi)
|
node, err := restic.NodeFromFileInfo(filename, fi)
|
||||||
if !arch.WithAtime {
|
if !arch.WithAtime {
|
||||||
node.AccessTime = node.ModTime
|
node.AccessTime = node.ModTime
|
||||||
}
|
}
|
||||||
|
// overwrite name to match that within the snapshot
|
||||||
|
node.Name = path.Base(snPath)
|
||||||
return node, errors.Wrap(err, "NodeFromFileInfo")
|
return node, errors.Wrap(err, "NodeFromFileInfo")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -237,7 +214,7 @@ func (arch *Archiver) wrapLoadTreeError(id restic.ID, err error) error {
|
|||||||
func (arch *Archiver) SaveDir(ctx context.Context, snPath string, dir string, fi os.FileInfo, previous *restic.Tree, complete CompleteFunc) (d FutureNode, err error) {
|
func (arch *Archiver) SaveDir(ctx context.Context, snPath string, dir string, fi os.FileInfo, previous *restic.Tree, complete CompleteFunc) (d FutureNode, err error) {
|
||||||
debug.Log("%v %v", snPath, dir)
|
debug.Log("%v %v", snPath, dir)
|
||||||
|
|
||||||
treeNode, err := arch.nodeFromFileInfo(dir, fi)
|
treeNode, err := arch.nodeFromFileInfo(snPath, dir, fi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return FutureNode{}, err
|
return FutureNode{}, err
|
||||||
}
|
}
|
||||||
@ -329,7 +306,7 @@ func (fn *FutureNode) take(ctx context.Context) futureNodeResult {
|
|||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
return futureNodeResult{}
|
return futureNodeResult{err: errors.Errorf("no result")}
|
||||||
}
|
}
|
||||||
|
|
||||||
// allBlobsPresent checks if all blobs (contents) of the given node are
|
// allBlobsPresent checks if all blobs (contents) of the given node are
|
||||||
@ -393,7 +370,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
|
|||||||
debug.Log("%v hasn't changed, using old list of blobs", target)
|
debug.Log("%v hasn't changed, using old list of blobs", target)
|
||||||
arch.CompleteItem(snPath, previous, previous, ItemStats{}, time.Since(start))
|
arch.CompleteItem(snPath, previous, previous, ItemStats{}, time.Since(start))
|
||||||
arch.CompleteBlob(snPath, previous.Size)
|
arch.CompleteBlob(snPath, previous.Size)
|
||||||
node, err := arch.nodeFromFileInfo(target, fi)
|
node, err := arch.nodeFromFileInfo(snPath, target, fi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return FutureNode{}, false, err
|
return FutureNode{}, false, err
|
||||||
}
|
}
|
||||||
@ -488,7 +465,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
|
|||||||
default:
|
default:
|
||||||
debug.Log(" %v other", target)
|
debug.Log(" %v other", target)
|
||||||
|
|
||||||
node, err := arch.nodeFromFileInfo(target, fi)
|
node, err := arch.nodeFromFileInfo(snPath, target, fi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return FutureNode{}, false, err
|
return FutureNode{}, false, err
|
||||||
}
|
}
|
||||||
@ -557,13 +534,32 @@ func (arch *Archiver) statDir(dir string) (os.FileInfo, error) {
|
|||||||
|
|
||||||
// SaveTree stores a Tree in the repo, returned is the tree. snPath is the path
|
// SaveTree stores a Tree in the repo, returned is the tree. snPath is the path
|
||||||
// within the current snapshot.
|
// within the current snapshot.
|
||||||
func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, previous *restic.Tree) (*restic.Tree, error) {
|
func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, previous *restic.Tree, complete CompleteFunc) (FutureNode, int, error) {
|
||||||
|
|
||||||
|
var node *restic.Node
|
||||||
|
if snPath != "/" {
|
||||||
|
if atree.FileInfoPath == "" {
|
||||||
|
return FutureNode{}, 0, errors.Errorf("FileInfoPath for %v is empty", snPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
fi, err := arch.statDir(atree.FileInfoPath)
|
||||||
|
if err != nil {
|
||||||
|
return FutureNode{}, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
debug.Log("%v, dir node data loaded from %v", snPath, atree.FileInfoPath)
|
||||||
|
node, err = arch.nodeFromFileInfo(snPath, atree.FileInfoPath, fi)
|
||||||
|
if err != nil {
|
||||||
|
return FutureNode{}, 0, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// fake root node
|
||||||
|
node = &restic.Node{}
|
||||||
|
}
|
||||||
|
|
||||||
debug.Log("%v (%v nodes), parent %v", snPath, len(atree.Nodes), previous)
|
debug.Log("%v (%v nodes), parent %v", snPath, len(atree.Nodes), previous)
|
||||||
|
|
||||||
nodeNames := atree.NodeNames()
|
nodeNames := atree.NodeNames()
|
||||||
tree := restic.NewTree(len(nodeNames))
|
nodes := make([]FutureNode, 0, len(nodeNames))
|
||||||
|
|
||||||
futureNodes := make(map[string]FutureNode)
|
|
||||||
|
|
||||||
// iterate over the nodes of atree in lexicographic (=deterministic) order
|
// iterate over the nodes of atree in lexicographic (=deterministic) order
|
||||||
for _, name := range nodeNames {
|
for _, name := range nodeNames {
|
||||||
@ -571,7 +567,7 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
|
|||||||
|
|
||||||
// test if context has been cancelled
|
// test if context has been cancelled
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return nil, ctx.Err()
|
return FutureNode{}, 0, ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is a leaf node
|
// this is a leaf node
|
||||||
@ -584,15 +580,15 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
|
|||||||
// ignore error
|
// ignore error
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, err
|
return FutureNode{}, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return FutureNode{}, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !excluded {
|
if !excluded {
|
||||||
futureNodes[name] = fn
|
nodes = append(nodes, fn)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -606,85 +602,21 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
|
|||||||
err = arch.error(join(snPath, name), err)
|
err = arch.error(join(snPath, name), err)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return FutureNode{}, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// not a leaf node, archive subtree
|
// not a leaf node, archive subtree
|
||||||
subtree, err := arch.SaveTree(ctx, join(snPath, name), &subatree, oldSubtree)
|
fn, _, err := arch.SaveTree(ctx, join(snPath, name), &subatree, oldSubtree, func(n *restic.Node, is ItemStats) {
|
||||||
|
arch.CompleteItem(snItem, oldNode, n, is, time.Since(start))
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return FutureNode{}, 0, err
|
||||||
}
|
}
|
||||||
|
nodes = append(nodes, fn)
|
||||||
tb, err := restic.TreeToBuilder(subtree)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
id, nodeStats, err := arch.saveTree(ctx, tb)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if subatree.FileInfoPath == "" {
|
|
||||||
return nil, errors.Errorf("FileInfoPath for %v/%v is empty", snPath, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("%v, saved subtree %v as %v", snPath, subtree, id.Str())
|
|
||||||
|
|
||||||
fi, err := arch.statDir(subatree.FileInfoPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("%v, dir node data loaded from %v", snPath, subatree.FileInfoPath)
|
|
||||||
|
|
||||||
node, err := arch.nodeFromFileInfo(subatree.FileInfoPath, fi)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
node.Name = name
|
|
||||||
node.Subtree = &id
|
|
||||||
|
|
||||||
err = tree.Insert(node)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
arch.CompleteItem(snItem, oldNode, node, nodeStats, time.Since(start))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
debug.Log("waiting on %d nodes", len(futureNodes))
|
fn := arch.treeSaver.Save(ctx, snPath, atree.FileInfoPath, node, nodes, complete)
|
||||||
|
return fn, len(nodes), nil
|
||||||
// process all futures
|
|
||||||
for name, fn := range futureNodes {
|
|
||||||
fnr := fn.take(ctx)
|
|
||||||
|
|
||||||
// return the error, or ignore it
|
|
||||||
if fnr.err != nil {
|
|
||||||
fnr.err = arch.error(fnr.target, fnr.err)
|
|
||||||
if fnr.err == nil {
|
|
||||||
// ignore error
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, fnr.err
|
|
||||||
}
|
|
||||||
|
|
||||||
// when the error is ignored, the node could not be saved, so ignore it
|
|
||||||
if fnr.node == nil {
|
|
||||||
debug.Log("%v excluded: %v", fnr.snPath, fnr.target)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
fnr.node.Name = name
|
|
||||||
|
|
||||||
err := tree.Insert(fnr.node)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return tree, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// flags are passed to fs.OpenFile. O_RDONLY is implied.
|
// flags are passed to fs.OpenFile. O_RDONLY is implied.
|
||||||
@ -786,7 +718,7 @@ func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group) {
|
|||||||
arch.fileSaver.CompleteBlob = arch.CompleteBlob
|
arch.fileSaver.CompleteBlob = arch.CompleteBlob
|
||||||
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
|
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
|
||||||
|
|
||||||
arch.treeSaver = NewTreeSaver(ctx, wg, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.Error)
|
arch.treeSaver = NewTreeSaver(ctx, wg, arch.Options.SaveTreeConcurrency, arch.blobSaver.Save, arch.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (arch *Archiver) stopWorkers() {
|
func (arch *Archiver) stopWorkers() {
|
||||||
@ -819,27 +751,33 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
|
|||||||
wg, wgCtx := errgroup.WithContext(wgUpCtx)
|
wg, wgCtx := errgroup.WithContext(wgUpCtx)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
var stats ItemStats
|
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
arch.runWorkers(wgCtx, wg)
|
arch.runWorkers(wgCtx, wg)
|
||||||
|
|
||||||
debug.Log("starting snapshot")
|
debug.Log("starting snapshot")
|
||||||
tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot))
|
fn, nodeCount, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot), func(n *restic.Node, is ItemStats) {
|
||||||
|
arch.CompleteItem("/", nil, nil, is, time.Since(start))
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(tree.Nodes) == 0 {
|
fnr := fn.take(wgCtx)
|
||||||
|
if fnr.err != nil {
|
||||||
|
return fnr.err
|
||||||
|
}
|
||||||
|
|
||||||
|
if wgCtx.Err() != nil {
|
||||||
|
return wgCtx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
if nodeCount == 0 {
|
||||||
return errors.New("snapshot is empty")
|
return errors.New("snapshot is empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
tb, err := restic.TreeToBuilder(tree)
|
rootTreeID = *fnr.node.Subtree
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
rootTreeID, stats, err = arch.saveTree(wgCtx, tb)
|
|
||||||
arch.stopWorkers()
|
arch.stopWorkers()
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
err = wg.Wait()
|
err = wg.Wait()
|
||||||
@ -850,8 +788,6 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
arch.CompleteItem("/", nil, nil, stats, time.Since(start))
|
|
||||||
|
|
||||||
return arch.Repo.Flush(ctx)
|
return arch.Repo.Flush(ctx)
|
||||||
})
|
})
|
||||||
err = wgUp.Wait()
|
err = wgUp.Wait()
|
||||||
|
@ -1118,16 +1118,18 @@ func TestArchiverSaveTree(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tree, err := arch.SaveTree(ctx, "/", atree, nil)
|
fn, _, err := arch.SaveTree(ctx, "/", atree, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
treeID, err := restic.SaveTree(ctx, repo, tree)
|
fnr := fn.take(context.TODO())
|
||||||
if err != nil {
|
if fnr.err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(fnr.err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
treeID := *fnr.node.Subtree
|
||||||
|
|
||||||
arch.stopWorkers()
|
arch.stopWorkers()
|
||||||
err = repo.Flush(ctx)
|
err = repo.Flush(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -27,7 +27,7 @@ type FileSaver struct {
|
|||||||
|
|
||||||
CompleteBlob func(filename string, bytes uint64)
|
CompleteBlob func(filename string, bytes uint64)
|
||||||
|
|
||||||
NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error)
|
NodeFromFileInfo func(snPath, filename string, fi os.FileInfo) (*restic.Node, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFileSaver returns a new file saver. A worker pool with fileWorkers is
|
// NewFileSaver returns a new file saver. A worker pool with fileWorkers is
|
||||||
@ -112,7 +112,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
|
|||||||
|
|
||||||
debug.Log("%v", snPath)
|
debug.Log("%v", snPath)
|
||||||
|
|
||||||
node, err := s.NodeFromFileInfo(f.Name(), fi)
|
node, err := s.NodeFromFileInfo(snPath, f.Name(), fi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = f.Close()
|
_ = f.Close()
|
||||||
fnr.err = err
|
fnr.err = err
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
@ -46,7 +47,9 @@ func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Cont
|
|||||||
}
|
}
|
||||||
|
|
||||||
s := NewFileSaver(ctx, wg, saveBlob, pol, workers, workers)
|
s := NewFileSaver(ctx, wg, saveBlob, pol, workers, workers)
|
||||||
s.NodeFromFileInfo = restic.NodeFromFileInfo
|
s.NodeFromFileInfo = func(snPath, filename string, fi os.FileInfo) (*restic.Node, error) {
|
||||||
|
return restic.NodeFromFileInfo(filename, fi)
|
||||||
|
}
|
||||||
|
|
||||||
return s, ctx, wg
|
return s, ctx, wg
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package archiver
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
@ -10,7 +11,7 @@ import (
|
|||||||
|
|
||||||
// TreeSaver concurrently saves incoming trees to the repo.
|
// TreeSaver concurrently saves incoming trees to the repo.
|
||||||
type TreeSaver struct {
|
type TreeSaver struct {
|
||||||
saveTree func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error)
|
saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob
|
||||||
errFn ErrorFunc
|
errFn ErrorFunc
|
||||||
|
|
||||||
ch chan<- saveTreeJob
|
ch chan<- saveTreeJob
|
||||||
@ -18,12 +19,12 @@ type TreeSaver struct {
|
|||||||
|
|
||||||
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
||||||
// started, it is stopped when ctx is cancelled.
|
// started, it is stopped when ctx is cancelled.
|
||||||
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveTree func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
|
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob, errFn ErrorFunc) *TreeSaver {
|
||||||
ch := make(chan saveTreeJob)
|
ch := make(chan saveTreeJob)
|
||||||
|
|
||||||
s := &TreeSaver{
|
s := &TreeSaver{
|
||||||
ch: ch,
|
ch: ch,
|
||||||
saveTree: saveTree,
|
saveBlob: saveBlob,
|
||||||
errFn: errFn,
|
errFn: errFn,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,6 +80,7 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I
|
|||||||
job.nodes = nil
|
job.nodes = nil
|
||||||
|
|
||||||
builder := restic.NewTreeJSONBuilder()
|
builder := restic.NewTreeJSONBuilder()
|
||||||
|
var lastNode *restic.Node
|
||||||
|
|
||||||
for i, fn := range nodes {
|
for i, fn := range nodes {
|
||||||
// fn is a copy, so clear the original value explicitly
|
// fn is a copy, so clear the original value explicitly
|
||||||
@ -105,18 +107,37 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I
|
|||||||
|
|
||||||
debug.Log("insert %v", fnr.node.Name)
|
debug.Log("insert %v", fnr.node.Name)
|
||||||
err := builder.AddNode(fnr.node)
|
err := builder.AddNode(fnr.node)
|
||||||
|
if err != nil && errors.Is(err, restic.ErrTreeNotOrdered) && lastNode != nil && fnr.node.Equals(*lastNode) {
|
||||||
|
// ignore error if an _identical_ node already exists, but nevertheless issue a warning
|
||||||
|
_ = s.errFn(fnr.target, err)
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, stats, err
|
return nil, stats, err
|
||||||
}
|
}
|
||||||
|
lastNode = fnr.node
|
||||||
}
|
}
|
||||||
|
|
||||||
id, treeStats, err := s.saveTree(ctx, builder)
|
buf, err := builder.Finalize()
|
||||||
stats.Add(treeStats)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, stats, err
|
return nil, stats, err
|
||||||
}
|
}
|
||||||
|
|
||||||
node.Subtree = &id
|
b := &Buffer{Data: buf}
|
||||||
|
res := s.saveBlob(ctx, restic.TreeBlob, b)
|
||||||
|
|
||||||
|
sbr := res.Take(ctx)
|
||||||
|
if !sbr.known {
|
||||||
|
stats.TreeBlobs++
|
||||||
|
stats.TreeSize += uint64(sbr.length)
|
||||||
|
stats.TreeSizeInRepo += uint64(sbr.sizeInRepo)
|
||||||
|
}
|
||||||
|
// The context was canceled in the meantime, id might be invalid
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil, stats, ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
node.Subtree = &sbr.id
|
||||||
return node, stats, nil
|
return node, stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,29 +4,46 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
"github.com/restic/restic/internal/test"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTreeSaver(t *testing.T) {
|
func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ch := make(chan SaveBlobResponse, 1)
|
||||||
defer cancel()
|
ch <- SaveBlobResponse{
|
||||||
|
id: restic.NewRandomID(),
|
||||||
|
known: false,
|
||||||
|
length: len(buf.Data),
|
||||||
|
sizeInRepo: len(buf.Data),
|
||||||
|
}
|
||||||
|
return FutureBlob{ch: ch}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupTreeSaver() (context.Context, context.CancelFunc, *TreeSaver, func() error) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
wg, ctx := errgroup.WithContext(ctx)
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
saveFn := func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) {
|
|
||||||
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
errFn := func(snPath string, err error) error {
|
errFn := func(snPath string, err error) error {
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn)
|
b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), treeSaveHelper, errFn)
|
||||||
|
|
||||||
|
shutdown := func() error {
|
||||||
|
b.TriggerShutdown()
|
||||||
|
return wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx, cancel, b, shutdown
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTreeSaver(t *testing.T) {
|
||||||
|
ctx, cancel, b, shutdown := setupTreeSaver()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
var results []FutureNode
|
var results []FutureNode
|
||||||
|
|
||||||
@ -35,7 +52,7 @@ func TestTreeSaver(t *testing.T) {
|
|||||||
Name: fmt.Sprintf("file-%d", i),
|
Name: fmt.Sprintf("file-%d", i),
|
||||||
}
|
}
|
||||||
|
|
||||||
fb := b.Save(ctx, "/", node.Name, node, nil, nil)
|
fb := b.Save(ctx, join("/", node.Name), node.Name, node, nil, nil)
|
||||||
results = append(results, fb)
|
results = append(results, fb)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,9 +60,7 @@ func TestTreeSaver(t *testing.T) {
|
|||||||
tree.take(ctx)
|
tree.take(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.TriggerShutdown()
|
err := shutdown()
|
||||||
|
|
||||||
err := wg.Wait()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -54,7 +69,7 @@ func TestTreeSaver(t *testing.T) {
|
|||||||
func TestTreeSaverError(t *testing.T) {
|
func TestTreeSaverError(t *testing.T) {
|
||||||
var tests = []struct {
|
var tests = []struct {
|
||||||
trees int
|
trees int
|
||||||
failAt int32
|
failAt int
|
||||||
}{
|
}{
|
||||||
{1, 1},
|
{1, 1},
|
||||||
{20, 2},
|
{20, 2},
|
||||||
@ -67,36 +82,27 @@ func TestTreeSaverError(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run("", func(t *testing.T) {
|
t.Run("", func(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel, b, shutdown := setupTreeSaver()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
wg, ctx := errgroup.WithContext(ctx)
|
|
||||||
|
|
||||||
var num int32
|
|
||||||
saveFn := func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) {
|
|
||||||
val := atomic.AddInt32(&num, 1)
|
|
||||||
if val == test.failAt {
|
|
||||||
t.Logf("sending error for request %v\n", test.failAt)
|
|
||||||
return restic.ID{}, ItemStats{}, errTest
|
|
||||||
}
|
|
||||||
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
errFn := func(snPath string, err error) error {
|
|
||||||
t.Logf("ignoring error %v\n", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn)
|
|
||||||
|
|
||||||
var results []FutureNode
|
var results []FutureNode
|
||||||
|
|
||||||
for i := 0; i < test.trees; i++ {
|
for i := 0; i < test.trees; i++ {
|
||||||
node := &restic.Node{
|
node := &restic.Node{
|
||||||
Name: fmt.Sprintf("file-%d", i),
|
Name: fmt.Sprintf("file-%d", i),
|
||||||
}
|
}
|
||||||
|
nodes := []FutureNode{
|
||||||
|
newFutureNodeWithResult(futureNodeResult{node: &restic.Node{
|
||||||
|
Name: fmt.Sprintf("child-%d", i),
|
||||||
|
}}),
|
||||||
|
}
|
||||||
|
if (i + 1) == test.failAt {
|
||||||
|
nodes = append(nodes, newFutureNodeWithResult(futureNodeResult{
|
||||||
|
err: errTest,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
fb := b.Save(ctx, "/", node.Name, node, nil, nil)
|
fb := b.Save(ctx, join("/", node.Name), node.Name, node, nodes, nil)
|
||||||
results = append(results, fb)
|
results = append(results, fb)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -104,16 +110,51 @@ func TestTreeSaverError(t *testing.T) {
|
|||||||
tree.take(ctx)
|
tree.take(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.TriggerShutdown()
|
err := shutdown()
|
||||||
|
|
||||||
err := wg.Wait()
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("expected error not found")
|
t.Errorf("expected error not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != errTest {
|
if err != errTest {
|
||||||
t.Fatalf("unexpected error found: %v", err)
|
t.Fatalf("unexpected error found: %v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTreeSaverDuplicates(t *testing.T) {
|
||||||
|
for _, identicalNodes := range []bool{true, false} {
|
||||||
|
t.Run("", func(t *testing.T) {
|
||||||
|
ctx, cancel, b, shutdown := setupTreeSaver()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
node := &restic.Node{
|
||||||
|
Name: "file",
|
||||||
|
}
|
||||||
|
nodes := []FutureNode{
|
||||||
|
newFutureNodeWithResult(futureNodeResult{node: &restic.Node{
|
||||||
|
Name: "child",
|
||||||
|
}}),
|
||||||
|
}
|
||||||
|
if identicalNodes {
|
||||||
|
nodes = append(nodes, newFutureNodeWithResult(futureNodeResult{node: &restic.Node{
|
||||||
|
Name: "child",
|
||||||
|
}}))
|
||||||
|
} else {
|
||||||
|
nodes = append(nodes, newFutureNodeWithResult(futureNodeResult{node: &restic.Node{
|
||||||
|
Name: "child",
|
||||||
|
Size: 42,
|
||||||
|
}}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fb := b.Save(ctx, join("/", node.Name), node.Name, node, nodes, nil)
|
||||||
|
fb.take(ctx)
|
||||||
|
|
||||||
|
err := shutdown()
|
||||||
|
if identicalNodes {
|
||||||
|
test.Assert(t, err == nil, "unexpected error found: %v", err)
|
||||||
|
} else {
|
||||||
|
test.Assert(t, err != nil, "expected error not found")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -145,6 +145,8 @@ func SaveTree(ctx context.Context, r BlobSaver, t *Tree) (ID, error) {
|
|||||||
return id, err
|
return id, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrTreeNotOrdered = errors.Errorf("nodes are not ordered or duplicate")
|
||||||
|
|
||||||
type TreeJSONBuilder struct {
|
type TreeJSONBuilder struct {
|
||||||
buf bytes.Buffer
|
buf bytes.Buffer
|
||||||
lastName string
|
lastName string
|
||||||
@ -158,7 +160,7 @@ func NewTreeJSONBuilder() *TreeJSONBuilder {
|
|||||||
|
|
||||||
func (builder *TreeJSONBuilder) AddNode(node *Node) error {
|
func (builder *TreeJSONBuilder) AddNode(node *Node) error {
|
||||||
if node.Name <= builder.lastName {
|
if node.Name <= builder.lastName {
|
||||||
return errors.Errorf("nodes are not ordered got %q, last %q", node.Name, builder.lastName)
|
return fmt.Errorf("node %q, last%q: %w", node.Name, builder.lastName, ErrTreeNotOrdered)
|
||||||
}
|
}
|
||||||
if builder.lastName != "" {
|
if builder.lastName != "" {
|
||||||
_ = builder.buf.WriteByte(',')
|
_ = builder.buf.WriteByte(',')
|
||||||
@ -182,14 +184,3 @@ func (builder *TreeJSONBuilder) Finalize() ([]byte, error) {
|
|||||||
builder.buf = bytes.Buffer{}
|
builder.buf = bytes.Buffer{}
|
||||||
return buf, nil
|
return buf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TreeToBuilder(t *Tree) (*TreeJSONBuilder, error) {
|
|
||||||
builder := NewTreeJSONBuilder()
|
|
||||||
for _, node := range t.Nodes {
|
|
||||||
err := builder.AddNode(node)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return builder, nil
|
|
||||||
}
|
|
||||||
|
@ -3,6 +3,7 @@ package restic_test
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -136,6 +137,7 @@ func TestTreeEqualSerialization(t *testing.T) {
|
|||||||
|
|
||||||
rtest.Assert(t, tree.Insert(node) != nil, "no error on duplicate node")
|
rtest.Assert(t, tree.Insert(node) != nil, "no error on duplicate node")
|
||||||
rtest.Assert(t, builder.AddNode(node) != nil, "no error on duplicate node")
|
rtest.Assert(t, builder.AddNode(node) != nil, "no error on duplicate node")
|
||||||
|
rtest.Assert(t, errors.Is(builder.AddNode(node), restic.ErrTreeNotOrdered), "wrong error returned")
|
||||||
}
|
}
|
||||||
|
|
||||||
treeBytes, err := json.Marshal(tree)
|
treeBytes, err := json.Marshal(tree)
|
||||||
|
Loading…
Reference in New Issue
Block a user